You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "platoneko (via GitHub)" <gi...@apache.org> on 2023/06/13 12:59:30 UTC

[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228044539


##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";

Review Comment:
   
   ```suggestion
   inline std::string_view CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
   ```
   I prefer to rewrite `std::string name()` signature to `std::string_view name()`



##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
+
+/// TimeSeries cumulative compaction policy implementation.
+/// The following three conditions will be considered when calculating compaction scores and selecting input rowsets in this policy:
+/// Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+/// Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_seconds
+/// The conditions are evaluated sequentially, starting with Condition 1.
+/// If any condition is met, the compaction score calculation or selection of input rowsets will be successful.
+class TimeSeriesCumulativeCompactionPolicy final : public CumulativeCompactionPolicy {
+public:
+    TimeSeriesCumulativeCompactionPolicy(
+            int64_t compaction_goal_size = config::time_series_compaction_goal_size_mbytes * 1024 *
+                                           1024,
+            int64_t compaction_file_count_threshold =
+                    config::time_series_compaction_file_count_threshold,
+            int64_t compaction_time_threshold_seconds =
+                    config::time_series_compaction_time_threshold_seconds);
+    ~TimeSeriesCumulativeCompactionPolicy() {}
+
+    // Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
+    uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+
+    /// TimeSeris cumulative compaction policy implements calculate cumulative point function.
+    /// When the first time the tablet does compact, this calculation is executed. Its main policy is to find first rowset
+    /// which does not satisfied the _compaction_goal_size * 0.8.
+    /// The result of compaction may be slightly smaller than the _compaction_goal_size.
+    void calculate_cumulative_point(Tablet* tablet,

Review Comment:
   Consider to rewrite this function's signature to `int64_t calculate_cumulative_point(Tablet* tablet)`



##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
+
+/// TimeSeries cumulative compaction policy implementation.
+/// The following three conditions will be considered when calculating compaction scores and selecting input rowsets in this policy:
+/// Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+/// Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_seconds
+/// The conditions are evaluated sequentially, starting with Condition 1.
+/// If any condition is met, the compaction score calculation or selection of input rowsets will be successful.
+class TimeSeriesCumulativeCompactionPolicy final : public CumulativeCompactionPolicy {
+public:
+    TimeSeriesCumulativeCompactionPolicy(
+            int64_t compaction_goal_size = config::time_series_compaction_goal_size_mbytes * 1024 *
+                                           1024,
+            int64_t compaction_file_count_threshold =
+                    config::time_series_compaction_file_count_threshold,
+            int64_t compaction_time_threshold_seconds =
+                    config::time_series_compaction_time_threshold_seconds);
+    ~TimeSeriesCumulativeCompactionPolicy() {}
+
+    // Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
+    uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+
+    /// TimeSeris cumulative compaction policy implements calculate cumulative point function.
+    /// When the first time the tablet does compact, this calculation is executed. Its main policy is to find first rowset
+    /// which does not satisfied the _compaction_goal_size * 0.8.
+    /// The result of compaction may be slightly smaller than the _compaction_goal_size.
+    void calculate_cumulative_point(Tablet* tablet,
+                                    const std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                    int64_t current_cumulative_point,
+                                    int64_t* cumulative_point) override;
+
+    /// Its main policy is picking rowsets from candidate rowsets by Condition 1, 2, 3.
+    int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+                           const int64_t max_compaction_score, const int64_t min_compaction_score,
+                           std::vector<RowsetSharedPtr>* input_rowsets,
+                           Version* last_delete_version, size_t* compaction_score) override;
+
+    /// The point must be updated after each cumulative compaction is completed.
+    /// We want each rowset to do cumulative compaction once.
+    void update_cumulative_point(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,

Review Comment:
   Consider to rewrite this function's signature to `void update_cumulative_point(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, int64_t last_delete_version)`



##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
+
+/// TimeSeries cumulative compaction policy implementation.
+/// The following three conditions will be considered when calculating compaction scores and selecting input rowsets in this policy:
+/// Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+/// Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_seconds
+/// The conditions are evaluated sequentially, starting with Condition 1.
+/// If any condition is met, the compaction score calculation or selection of input rowsets will be successful.
+class TimeSeriesCumulativeCompactionPolicy final : public CumulativeCompactionPolicy {
+public:
+    TimeSeriesCumulativeCompactionPolicy(
+            int64_t compaction_goal_size = config::time_series_compaction_goal_size_mbytes * 1024 *
+                                           1024,
+            int64_t compaction_file_count_threshold =
+                    config::time_series_compaction_file_count_threshold,
+            int64_t compaction_time_threshold_seconds =
+                    config::time_series_compaction_time_threshold_seconds);
+    ~TimeSeriesCumulativeCompactionPolicy() {}
+
+    // Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
+    uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+
+    /// TimeSeris cumulative compaction policy implements calculate cumulative point function.
+    /// When the first time the tablet does compact, this calculation is executed. Its main policy is to find first rowset
+    /// which does not satisfied the _compaction_goal_size * 0.8.
+    /// The result of compaction may be slightly smaller than the _compaction_goal_size.
+    void calculate_cumulative_point(Tablet* tablet,
+                                    const std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                    int64_t current_cumulative_point,
+                                    int64_t* cumulative_point) override;
+
+    /// Its main policy is picking rowsets from candidate rowsets by Condition 1, 2, 3.
+    int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,

Review Comment:
   Consider to rewrite this function's signature to `int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, Version* last_delete_version, size_t* compaction_score)`, and directly use config value in function implement.



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } 
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+

Review Comment:
   
   ```suggestion
       auto rs_metas = tablet->all_rs_metas();
   ```



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } 
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+

Review Comment:
   
   ```suggestion
       if (tablet->tablet_state() != TABLET_RUNNING) return;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org