You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "csun5285 (via GitHub)" <gi...@apache.org> on 2023/06/12 10:01:34 UTC

[GitHub] [doris] csun5285 opened a new pull request, #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

csun5285 opened a new pull request, #20715:
URL: https://github.com/apache/doris/pull/20715

   time-series scenario cumulative compaction policy
   ## Proposed changes
   
   Issue Number: close #xxx
   
   <!--Describe your changes.-->
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1591070909

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1227894981


##########
docs/zh-CN/docs/admin-manual/config/be-config.md:
##########
@@ -663,6 +663,32 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 * 描述:更新 peer replica infos 的最小间隔时间
 * 默认值:10(s)
 
+#### `enable_time_series_compaction_mode`
+
+* 类型:bool
+* 描述:在时序场景下,开启 time series compaction 来减少写放大,避免长期 compaction 占据大量 cpu
+  - 开启 time series compaction 时,将使用 time_series_compaction 为前缀的参数来调整 compaction 的执行
+* 默认值:false
+
+#### `time_series_compaction_goal_size_mbytes`
+
+* 类型:int64
+* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件的大小,输出的文件大小可能比配置值略小

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228266075


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() < (_compaction_goal_size * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= _compaction_goal_size) {
+            return transient_size;
+        }
+    }
+

Review Comment:
   If the first version after the cumu point is NONOVERLAPPING and the second version is a delete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1590820564

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1591070839

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229108691


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } 
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1587160012

   run BE UT (Clang)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228100228


##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1590418707

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229086473


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= config::time_series_compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > (config::time_series_compaction_time_threshold_seconds * 1000)) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() <
+                        (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+            return transient_size;
+        }
+    }
+
+    // if there is delete version, do compaction directly
+    if (last_delete_version->first != -1) {
+        if (input_rowsets->size() == 1) {
+            auto rs_meta = input_rowsets->front()->rowset_meta();

Review Comment:
   
   ```suggestion
              if (input_rowsets->size() == 1 && !input_rowsets->front()->rowset_meta()->is_segments_overlapping())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1590344610

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229032359


##########
be/src/olap/tablet.cpp:
##########
@@ -1062,10 +1063,18 @@ uint32_t Tablet::_calc_base_compaction_score() const {
             // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
             continue;
         }
-
+        if (rs_meta->has_delete_predicate()) {
+            has_delete = true;
+        }
         score += rs_meta->get_compaction_score();
     }
 
+    // In the time series compaction policy, we want the base compaction to be triggered
+    // when there are delete versions present.
+    if (config::compaction_policy == std::string("time_series")) {

Review Comment:
   
   ```suggestion
       if (config::compaction_policy == "time_series") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229116516


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= config::time_series_compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > (config::time_series_compaction_time_threshold_seconds * 1000)) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() <
+                        (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+            return transient_size;
+        }
+    }
+
+    // if there is delete version, do compaction directly
+    if (last_delete_version->first != -1) {
+        if (input_rowsets->size() == 1) {
+            auto rs_meta = input_rowsets->front()->rowset_meta();

Review Comment:
   done



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= config::time_series_compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > (config::time_series_compaction_time_threshold_seconds * 1000)) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() <
+                        (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228259846


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } 
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+

Review Comment:
   The condition has already been evaluated earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] xiaokang merged pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "xiaokang (via GitHub)" <gi...@apache.org>.
xiaokang merged PR #20715:
URL: https://github.com/apache/doris/pull/20715


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229086473


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= config::time_series_compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > (config::time_series_compaction_time_threshold_seconds * 1000)) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() <
+                        (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+            return transient_size;
+        }
+    }
+
+    // if there is delete version, do compaction directly
+    if (last_delete_version->first != -1) {
+        if (input_rowsets->size() == 1) {
+            auto rs_meta = input_rowsets->front()->rowset_meta();

Review Comment:
   
   ```suggestion
              if (input_rowsets->size() == 1 && input_rowsets->front()->rowset_meta()->is_segments_overlapping())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1587090224

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229083968


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() < (_compaction_goal_size * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= _compaction_goal_size) {
+            return transient_size;
+        }
+    }
+

Review Comment:
   This is incorrect when loaded delta data has only 1 segment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229080485


##########
be/src/common/config.h:
##########
@@ -1004,6 +1004,17 @@ DECLARE_Int32(num_broadcast_buffer);
 // semi-structure configs
 DECLARE_Bool(enable_parse_multi_dimession_array);
 
+// Currently, two compaction strategies are implemented, SIZE_BASED and TIME_SERIES.
+// In the case of time series compaction, the execution of compaction is adjusted
+// using parameters that have the prefix time_series_compaction.
+DECLARE_mString(compaction_policy);

Review Comment:
   Verify config



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228053177


##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
+
+/// TimeSeries cumulative compaction policy implementation.
+/// The following three conditions will be considered when calculating compaction scores and selecting input rowsets in this policy:
+/// Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+/// Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_seconds
+/// The conditions are evaluated sequentially, starting with Condition 1.
+/// If any condition is met, the compaction score calculation or selection of input rowsets will be successful.
+class TimeSeriesCumulativeCompactionPolicy final : public CumulativeCompactionPolicy {
+public:
+    TimeSeriesCumulativeCompactionPolicy(
+            int64_t compaction_goal_size = config::time_series_compaction_goal_size_mbytes * 1024 *
+                                           1024,
+            int64_t compaction_file_count_threshold =
+                    config::time_series_compaction_file_count_threshold,
+            int64_t compaction_time_threshold_seconds =
+                    config::time_series_compaction_time_threshold_seconds);
+    ~TimeSeriesCumulativeCompactionPolicy() {}
+
+    // Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
+    uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+
+    /// TimeSeris cumulative compaction policy implements calculate cumulative point function.
+    /// When the first time the tablet does compact, this calculation is executed. Its main policy is to find first rowset
+    /// which does not satisfied the _compaction_goal_size * 0.8.
+    /// The result of compaction may be slightly smaller than the _compaction_goal_size.
+    void calculate_cumulative_point(Tablet* tablet,
+                                    const std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                    int64_t current_cumulative_point,
+                                    int64_t* cumulative_point) override;
+
+    /// Its main policy is picking rowsets from candidate rowsets by Condition 1, 2, 3.
+    int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,

Review Comment:
   Consider to rewrite this function's signature to `int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, Version* last_delete_version, size_t* compaction_score)`, and directly use config value in function implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1589498148

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229090078


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= config::time_series_compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > (config::time_series_compaction_time_threshold_seconds * 1000)) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() <
+                        (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {

Review Comment:
   
   ```suggestion
           if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
               if (input_rowsets->size() == 1 && !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
                   // Only 1 non-overlapping rowset, skip it
                   input_rowsets->clear();
                   *compaction_score = 0;
                   continue;
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1588445296

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228023105


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } else {
+        tablet->set_last_cumu_compaction_success_time(now);

Review Comment:
   I have deleted the content, the system continues to function properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228187514


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() < (_compaction_goal_size * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= _compaction_goal_size) {
+            return transient_size;
+        }
+    }
+

Review Comment:
   What if only 1 NONOVERLAPPING input rowset here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229110969


##########
be/src/olap/tablet.cpp:
##########
@@ -1062,10 +1063,18 @@ uint32_t Tablet::_calc_base_compaction_score() const {
             // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
             continue;
         }
-
+        if (rs_meta->has_delete_predicate()) {
+            has_delete = true;
+        }
         score += rs_meta->get_compaction_score();
     }
 
+    // In the time series compaction policy, we want the base compaction to be triggered
+    // when there are delete versions present.
+    if (config::compaction_policy == std::string("time_series")) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228044539


##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";

Review Comment:
   
   ```suggestion
   inline std::string_view CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
   ```
   I prefer to rewrite `std::string name()` signature to `std::string_view name()`



##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
+
+/// TimeSeries cumulative compaction policy implementation.
+/// The following three conditions will be considered when calculating compaction scores and selecting input rowsets in this policy:
+/// Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+/// Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_seconds
+/// The conditions are evaluated sequentially, starting with Condition 1.
+/// If any condition is met, the compaction score calculation or selection of input rowsets will be successful.
+class TimeSeriesCumulativeCompactionPolicy final : public CumulativeCompactionPolicy {
+public:
+    TimeSeriesCumulativeCompactionPolicy(
+            int64_t compaction_goal_size = config::time_series_compaction_goal_size_mbytes * 1024 *
+                                           1024,
+            int64_t compaction_file_count_threshold =
+                    config::time_series_compaction_file_count_threshold,
+            int64_t compaction_time_threshold_seconds =
+                    config::time_series_compaction_time_threshold_seconds);
+    ~TimeSeriesCumulativeCompactionPolicy() {}
+
+    // Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
+    uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+
+    /// TimeSeris cumulative compaction policy implements calculate cumulative point function.
+    /// When the first time the tablet does compact, this calculation is executed. Its main policy is to find first rowset
+    /// which does not satisfied the _compaction_goal_size * 0.8.
+    /// The result of compaction may be slightly smaller than the _compaction_goal_size.
+    void calculate_cumulative_point(Tablet* tablet,

Review Comment:
   Consider to rewrite this function's signature to `int64_t calculate_cumulative_point(Tablet* tablet)`



##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
+
+/// TimeSeries cumulative compaction policy implementation.
+/// The following three conditions will be considered when calculating compaction scores and selecting input rowsets in this policy:
+/// Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+/// Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_seconds
+/// The conditions are evaluated sequentially, starting with Condition 1.
+/// If any condition is met, the compaction score calculation or selection of input rowsets will be successful.
+class TimeSeriesCumulativeCompactionPolicy final : public CumulativeCompactionPolicy {
+public:
+    TimeSeriesCumulativeCompactionPolicy(
+            int64_t compaction_goal_size = config::time_series_compaction_goal_size_mbytes * 1024 *
+                                           1024,
+            int64_t compaction_file_count_threshold =
+                    config::time_series_compaction_file_count_threshold,
+            int64_t compaction_time_threshold_seconds =
+                    config::time_series_compaction_time_threshold_seconds);
+    ~TimeSeriesCumulativeCompactionPolicy() {}
+
+    // Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
+    uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+
+    /// TimeSeris cumulative compaction policy implements calculate cumulative point function.
+    /// When the first time the tablet does compact, this calculation is executed. Its main policy is to find first rowset
+    /// which does not satisfied the _compaction_goal_size * 0.8.
+    /// The result of compaction may be slightly smaller than the _compaction_goal_size.
+    void calculate_cumulative_point(Tablet* tablet,
+                                    const std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                    int64_t current_cumulative_point,
+                                    int64_t* cumulative_point) override;
+
+    /// Its main policy is picking rowsets from candidate rowsets by Condition 1, 2, 3.
+    int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+                           const int64_t max_compaction_score, const int64_t min_compaction_score,
+                           std::vector<RowsetSharedPtr>* input_rowsets,
+                           Version* last_delete_version, size_t* compaction_score) override;
+
+    /// The point must be updated after each cumulative compaction is completed.
+    /// We want each rowset to do cumulative compaction once.
+    void update_cumulative_point(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,

Review Comment:
   Consider to rewrite this function's signature to `void update_cumulative_point(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, int64_t last_delete_version)`



##########
be/src/olap/cumulative_compaction_time_series_policy.h:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+const static std::string CUMULATIVE_TIME_SERIES_POLICY = "TIME_SERIES";
+
+/// TimeSeries cumulative compaction policy implementation.
+/// The following three conditions will be considered when calculating compaction scores and selecting input rowsets in this policy:
+/// Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+/// Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+/// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_seconds
+/// The conditions are evaluated sequentially, starting with Condition 1.
+/// If any condition is met, the compaction score calculation or selection of input rowsets will be successful.
+class TimeSeriesCumulativeCompactionPolicy final : public CumulativeCompactionPolicy {
+public:
+    TimeSeriesCumulativeCompactionPolicy(
+            int64_t compaction_goal_size = config::time_series_compaction_goal_size_mbytes * 1024 *
+                                           1024,
+            int64_t compaction_file_count_threshold =
+                    config::time_series_compaction_file_count_threshold,
+            int64_t compaction_time_threshold_seconds =
+                    config::time_series_compaction_time_threshold_seconds);
+    ~TimeSeriesCumulativeCompactionPolicy() {}
+
+    // Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet.
+    uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
+
+    /// TimeSeris cumulative compaction policy implements calculate cumulative point function.
+    /// When the first time the tablet does compact, this calculation is executed. Its main policy is to find first rowset
+    /// which does not satisfied the _compaction_goal_size * 0.8.
+    /// The result of compaction may be slightly smaller than the _compaction_goal_size.
+    void calculate_cumulative_point(Tablet* tablet,
+                                    const std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                    int64_t current_cumulative_point,
+                                    int64_t* cumulative_point) override;
+
+    /// Its main policy is picking rowsets from candidate rowsets by Condition 1, 2, 3.
+    int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,

Review Comment:
   Consider to rewrite this function's signature to `int pick_input_rowsets(Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, Version* last_delete_version, size_t* compaction_score)`, and directly use config value in function implement.



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } 
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+

Review Comment:
   
   ```suggestion
       auto rs_metas = tablet->all_rs_metas();
   ```



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } 
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+

Review Comment:
   
   ```suggestion
       if (tablet->tablet_state() != TABLET_RUNNING) return;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1227935905


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } else {
+        tablet->set_last_cumu_compaction_success_time(now);
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() < (_compaction_goal_size * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= _compaction_goal_size) {
+            return transient_size;

Review Comment:
   last_delete_version has a separate branch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1226409763


##########
be/test/olap/cumulative_compaction_time_series_policy_test.cpp:
##########
@@ -0,0 +1,573 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"

Review Comment:
   warning: 'olap/cumulative_compaction_time_series_policy.h' file not found [clang-diagnostic-error]
   ```cpp
   #include "olap/cumulative_compaction_time_series_policy.h"
            ^
   ```
   



##########
be/test/olap/cumulative_compaction_time_series_policy_test.cpp:
##########
@@ -0,0 +1,573 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+#include "olap/cumulative_compaction.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test {
+public:
+    TestTimeSeriesCumulativeCompactionPolicy() {}

Review Comment:
   warning: use '= default' to define a trivial default constructor [modernize-use-equals-default]
   
   ```suggestion
       TestTimeSeriesCumulativeCompactionPolicy() = default;
   ```
   



##########
be/test/olap/cumulative_compaction_time_series_policy_test.cpp:
##########
@@ -0,0 +1,573 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+#include "olap/cumulative_compaction.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test {
+public:
+    TestTimeSeriesCumulativeCompactionPolicy() {}
+    void SetUp() {

Review Comment:
   warning: annotate this function with 'override' or (rarely) 'final' [modernize-use-override]
   
   ```suggestion
       void SetUp() override {
   ```
   



##########
be/test/olap/cumulative_compaction_time_series_policy_test.cpp:
##########
@@ -0,0 +1,573 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+#include "olap/cumulative_compaction.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test {
+public:
+    TestTimeSeriesCumulativeCompactionPolicy() {}
+    void SetUp() {
+        config::enable_time_series_compaction_mode = true;
+        config::time_series_compaction_goal_size_mbytes = 1024;
+        config::time_series_compaction_file_count_threshold = 10;
+        config::time_series_compaction_time_threshold_seconds = 3600;
+
+        _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
+                1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
+                TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+
+        _json_rowset_meta = R"({
+            "rowset_id": 540081,
+            "tablet_id": 15673,
+            "txn_id": 4042,
+            "tablet_schema_hash": 567997577,
+            "rowset_type": "BETA_ROWSET",
+            "rowset_state": "VISIBLE",
+            "start_version": 2,
+            "end_version": 2,
+            "num_rows": 3929,
+            "total_disk_size": 41,
+            "data_disk_size": 41,
+            "index_disk_size": 235,
+            "empty": false,
+            "load_id": {
+                "hi": -5350970832824939812,
+                "lo": -6717994719194512122
+            },
+            "creation_time": 1553765670,
+            "num_segments": 3
+        })";
+    }
+    void TearDown() {}

Review Comment:
   warning: annotate this function with 'override' or (rarely) 'final' [modernize-use-override]
   
   ```suggestion
       void TearDown() override {}
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1227945115


##########
be/src/common/config.cpp:
##########
@@ -992,6 +992,12 @@ DEFINE_Int32(num_broadcast_buffer, "32");
 // semi-structure configs
 DEFINE_Bool(enable_parse_multi_dimession_array, "true");
 
+// time series compaction mode
+DEFINE_Bool(enable_time_series_compaction_mode, "false");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] xiaokang commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "xiaokang (via GitHub)" <gi...@apache.org>.
xiaokang commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1227788905


##########
be/src/common/config.h:
##########
@@ -1008,6 +1008,12 @@ DECLARE_Int32(num_broadcast_buffer);
 // semi-structure configs
 DECLARE_Bool(enable_parse_multi_dimession_array);
 
+// time series compaction mode
+DECLARE_Bool(enable_time_series_compaction_mode);
+DECLARE_mInt64(time_series_compaction_goal_size_mbytes);

Review Comment:
   add comment for each config



##########
docs/zh-CN/docs/admin-manual/config/be-config.md:
##########
@@ -663,6 +663,32 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 * 描述:更新 peer replica infos 的最小间隔时间
 * 默认值:10(s)
 
+#### `enable_time_series_compaction_mode`
+
+* 类型:bool
+* 描述:在时序场景下,开启 time series compaction 来减少写放大,避免长期 compaction 占据大量 cpu
+  - 开启 time series compaction 时,将使用 time_series_compaction 为前缀的参数来调整 compaction 的执行
+* 默认值:false
+
+#### `time_series_compaction_goal_size_mbytes`
+
+* 类型:int64
+* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件的大小,输出的文件大小可能比配置值略小

Review Comment:
   数仓文件大小跟输入相当



##########
be/src/common/config.cpp:
##########
@@ -992,6 +992,12 @@ DEFINE_Int32(num_broadcast_buffer, "32");
 // semi-structure configs
 DEFINE_Bool(enable_parse_multi_dimession_array, "true");
 
+// time series compaction mode
+DEFINE_Bool(enable_time_series_compaction_mode, "false");

Review Comment:
   compaction_mode=time_series



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } else {
+        tablet->set_last_cumu_compaction_success_time(now);

Review Comment:
   It seems to be wrong to update success time here, since it's not success.



##########
docs/zh-CN/docs/admin-manual/config/be-config.md:
##########
@@ -663,6 +663,32 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 * 描述:更新 peer replica infos 的最小间隔时间
 * 默认值:10(s)
 
+#### `enable_time_series_compaction_mode`
+
+* 类型:bool
+* 描述:在时序场景下,开启 time series compaction 来减少写放大,避免长期 compaction 占据大量 cpu
+  - 开启 time series compaction 时,将使用 time_series_compaction 为前缀的参数来调整 compaction 的执行
+* 默认值:false
+
+#### `time_series_compaction_goal_size_mbytes`
+
+* 类型:int64
+* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件的大小,输出的文件大小可能比配置值略小
+* 默认值:1024
+
+#### `time_series_compaction_file_count_threshold`
+
+* 类型:int64
+* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件数量的最小值,只有当 time_series_compaction_goal_size_mbytes 条件不满足时,该参数才会发挥作用
+  - 在开启 single tablet load 时,会产生大量空 rowset,需要调大 time_series_compaction_file_count_threshold

Review Comment:
   一个tablet中文件数超过该配置,会触发compaction



##########
be/src/olap/tablet.cpp:
##########
@@ -1062,9 +1063,14 @@ uint32_t Tablet::_calc_base_compaction_score() const {
             // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
             continue;
         }
-
+        if (rs_meta->has_delete_predicate()) {
+            has_delete = true;
+        }
         score += rs_meta->get_compaction_score();
     }
+    if (config::enable_time_series_compaction_mode) {
+        return base_rowset_exist && has_delete ? score : 0;

Review Comment:
   (base_rowset_exist && has_delete)



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {

Review Comment:
   Does this compaction policy still work if is_local() is false?



##########
be/src/olap/tablet.cpp:
##########
@@ -1062,9 +1063,14 @@ uint32_t Tablet::_calc_base_compaction_score() const {
             // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
             continue;
         }
-
+        if (rs_meta->has_delete_predicate()) {
+            has_delete = true;
+        }
         score += rs_meta->get_compaction_score();
     }
+    if (config::enable_time_series_compaction_mode) {

Review Comment:
   add comment to explain why



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } else {
+        tablet->set_last_cumu_compaction_success_time(now);
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() < (_compaction_goal_size * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= _compaction_goal_size) {
+            return transient_size;

Review Comment:
   Is it necessary to consider last_delete_version before return?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1227899646


##########
be/src/olap/tablet.cpp:
##########
@@ -1062,9 +1063,14 @@ uint32_t Tablet::_calc_base_compaction_score() const {
             // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
             continue;
         }
-
+        if (rs_meta->has_delete_predicate()) {
+            has_delete = true;
+        }
         score += rs_meta->get_compaction_score();
     }
+    if (config::enable_time_series_compaction_mode) {
+        return base_rowset_exist && has_delete ? score : 0;

Review Comment:
   done



##########
be/src/olap/tablet.cpp:
##########
@@ -1062,9 +1063,14 @@ uint32_t Tablet::_calc_base_compaction_score() const {
             // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
             continue;
         }
-
+        if (rs_meta->has_delete_predicate()) {
+            has_delete = true;
+        }
         score += rs_meta->get_compaction_score();
     }
+    if (config::enable_time_series_compaction_mode) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1590666165

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1227897179


##########
docs/zh-CN/docs/admin-manual/config/be-config.md:
##########
@@ -663,6 +663,32 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 * 描述:更新 peer replica infos 的最小间隔时间
 * 默认值:10(s)
 
+#### `enable_time_series_compaction_mode`
+
+* 类型:bool
+* 描述:在时序场景下,开启 time series compaction 来减少写放大,避免长期 compaction 占据大量 cpu
+  - 开启 time series compaction 时,将使用 time_series_compaction 为前缀的参数来调整 compaction 的执行
+* 默认值:false
+
+#### `time_series_compaction_goal_size_mbytes`
+
+* 类型:int64
+* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件的大小,输出的文件大小可能比配置值略小
+* 默认值:1024
+
+#### `time_series_compaction_file_count_threshold`
+
+* 类型:int64
+* 描述:开启 time series compaction 时,将使用此参数来调整每次 compaction 输入的文件数量的最小值,只有当 time_series_compaction_goal_size_mbytes 条件不满足时,该参数才会发挥作用
+  - 在开启 single tablet load 时,会产生大量空 rowset,需要调大 time_series_compaction_file_count_threshold

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1227893101


##########
be/src/common/config.h:
##########
@@ -1008,6 +1008,12 @@ DECLARE_Int32(num_broadcast_buffer);
 // semi-structure configs
 DECLARE_Bool(enable_parse_multi_dimession_array);
 
+// time series compaction mode
+DECLARE_Bool(enable_time_series_compaction_mode);
+DECLARE_mInt64(time_series_compaction_goal_size_mbytes);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1590538619

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1588836608

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229090078


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= config::time_series_compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > (config::time_series_compaction_time_threshold_seconds * 1000)) {
+            return score;
+        }
+    }
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+
+        int64_t prev_version = -1;
+        for (const RowsetMetaSharedPtr& rs : existing_rss) {
+            if (rs->version().first > prev_version + 1) {
+                // There is a hole, do not continue
+                break;
+            }
+
+            bool is_delete = rs->has_delete_predicate();
+
+            // break the loop if segments in this rowset is overlapping.
+            if (!is_delete && rs->is_segments_overlapping()) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // check the rowset is whether less than _compaction_goal_size
+            // The result of compaction may be slightly smaller than the _compaction_goal_size.
+            if (!is_delete && rs->version().first != 0 &&
+                rs->total_disk_size() <
+                        (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 * 0.8)) {
+                *ret_cumulative_point = rs->version().first;
+                break;
+            }
+
+            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
+            prev_version = rs->version().second;
+            *ret_cumulative_point = prev_version + 1;
+        }
+        VLOG_NOTICE
+                << "cumulative compaction time serires policy, calculate cumulative point value = "
+                << *ret_cumulative_point << " tablet = " << tablet->full_name();
+    }
+}
+
+int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
+        size_t* compaction_score) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions before.
+                // we should compact those version before handling them over to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+        if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {

Review Comment:
   
   ```suggestion
           if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) {
               if (input_rowsets->size() == 1 && !input_rowsets->front()->rowset_meta()->is_segments_overlapping() {
                   // Only 1 non-overlapping rowset, skip it
                   input_rowsets->clear();
                   *compaction_score = 0;
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1590316812

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] platoneko commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "platoneko (via GitHub)" <gi...@apache.org>.
platoneko commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229082485


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part
+            total_size += rs_meta->total_disk_size();
+            score += rs_meta->get_compaction_score();
+        }
+    }
+
+    if (first_meta == nullptr) {
+        return 0;
+    }
+
+    // If base version does not exist, but its state is RUNNING.
+    // It is abnormal, do not select it and set *score = 0
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
+        LOG(WARNING) << "tablet state is running but have no base version";
+        return 0;
+    }
+
+    // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size
+    if (total_size >= _compaction_goal_size) {
+        return score;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified by parameter _compaction_file_count_threshold
+    if (score >= _compaction_file_count_threshold) {
+        return score;
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
+        if (cumu_interval > _compaction_time_threshold_seconds * 1000) {
+            return score;
+        }
+    } 
+
+    return 0;
+}
+
+void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
+        Tablet* tablet, const std::vector<RowsetMetaSharedPtr>& all_metas,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+
+    if (tablet->tablet_state() == TABLET_RUNNING) {
+        // check base rowset first version must be zero
+        // for tablet which state is not TABLET_RUNNING, there may not have base version.
+        CHECK((*base_rowset_meta)->start_version() == 0);
+

Review Comment:
   I mean early return to reduce indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1228012530


##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cumulative_compaction_time_series_policy.h"
+
+#include "common/logging.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+namespace doris {
+
+TimeSeriesCumulativeCompactionPolicy::TimeSeriesCumulativeCompactionPolicy(
+        int64_t compaction_goal_size, int64_t compaction_file_count_threshold,
+        int64_t compaction_time_threshold_seconds)
+        : _compaction_goal_size(compaction_goal_size),
+          _compaction_file_count_threshold(compaction_file_count_threshold),
+          _compaction_time_threshold_seconds(compaction_time_threshold_seconds) {}
+
+uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
+    uint32_t score = 0;
+    bool base_rowset_exist = false;
+    const int64_t point = tablet->cumulative_layer_point();
+
+    int64_t total_size = 0;
+    RowsetMetaSharedPtr first_meta;
+    int64_t first_version = INT64_MAX;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
+        if (rs_meta->start_version() < first_version) {
+            first_version = rs_meta->start_version();
+            first_meta = rs_meta;
+        }
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {

Review Comment:
   Base and cumulative compaction will only merge rowsets where the condition is_local() == true, cold data compaction will only merge rowsets where is_local() == false
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1588611972

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on PR #20715:
URL: https://github.com/apache/doris/pull/20715#issuecomment-1588713367

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] csun5285 commented on a diff in pull request #20715: [Enhancement](compaction) time-series scenario cumulative compaction policy

Posted by "csun5285 (via GitHub)" <gi...@apache.org>.
csun5285 commented on code in PR #20715:
URL: https://github.com/apache/doris/pull/20715#discussion_r1229134646


##########
be/src/common/config.h:
##########
@@ -1004,6 +1004,17 @@ DECLARE_Int32(num_broadcast_buffer);
 // semi-structure configs
 DECLARE_Bool(enable_parse_multi_dimession_array);
 
+// Currently, two compaction strategies are implemented, SIZE_BASED and TIME_SERIES.
+// In the case of time series compaction, the execution of compaction is adjusted
+// using parameters that have the prefix time_series_compaction.
+DECLARE_mString(compaction_policy);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org