You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2019/09/25 07:27:49 UTC

[incubator-doris] branch master updated: Take segments in singleton rowset into consideration upon cumulative compaction (#1866)

This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 09482c9  Take segments in singleton rowset into consideration upon cumulative compaction (#1866)
09482c9 is described below

commit 09482c9f52d726ad804aaa55de91b95f0041f954
Author: lichaoyong <li...@baidu.com>
AuthorDate: Wed Sep 25 15:27:44 2019 +0800

    Take segments in singleton rowset into consideration upon cumulative compaction (#1866)
    
    In previous compaction, only rowsets will be taken into consideration.
    Doing streaming load, the singleton rowset may is made up of many overlapping segments.
    Scanning these overlapping segments will result in read amplification.
    To address this problem, overlapping segments should be taken into consideration
    when doing cumulative compaction to reduce read amplification.
---
 be/src/olap/cumulative_compaction.cpp |  16 +++-
 be/src/olap/olap_header.h             | 156 ----------------------------------
 be/src/olap/tablet.cpp                |   3 +
 3 files changed, 15 insertions(+), 160 deletions(-)

diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp
index 6f8510a..d89eb1f 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -78,6 +78,7 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
     RETURN_NOT_OK(check_version_continuity(candidate_rowsets));
 
     std::vector<RowsetSharedPtr> transient_rowsets;
+    size_t num_overlapping_segments = 0;
     for (size_t i = 0; i < candidate_rowsets.size() - 1; ++i) {
         // VersionHash will calculated from chosen rowsets.
         // If ultimate singleton rowset is chosen, VersionHash
@@ -85,22 +86,29 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
         // So the ultimate singleton rowset is revserved.
         RowsetSharedPtr rowset = candidate_rowsets[i];
         if (_tablet->version_for_delete_predicate(rowset->version())) {
-            if (transient_rowsets.size() > config::min_cumulative_compaction_num_singleton_deltas) {
+            if (num_overlapping_segments >= config::min_cumulative_compaction_num_singleton_deltas) {
                 _input_rowsets = transient_rowsets;
                 break;
             }
             transient_rowsets.clear();
+            num_overlapping_segments = 0;
             continue;
         }
 
-        if (transient_rowsets.size() >= config::max_cumulative_compaction_num_singleton_deltas) {
+        if (num_overlapping_segments >= config::max_cumulative_compaction_num_singleton_deltas) {
             // the threshold of files to compacted one time
             break;
         }
+
+        if (rowset->start_version() == rowset->end_version()) {
+            num_overlapping_segments += rowset->num_segments();
+        } else {
+            num_overlapping_segments++;
+        }
         transient_rowsets.push_back(rowset); 
     }
 
-    if (transient_rowsets.size() > config::min_cumulative_compaction_num_singleton_deltas) {
+    if (num_overlapping_segments >= config::min_cumulative_compaction_num_singleton_deltas) {
         _input_rowsets = transient_rowsets;
     }
 		
@@ -113,7 +121,7 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
 
     if (_input_rowsets.size() <= 1) {
         LOG(WARNING) << "There is no enough rowsets to cumulative compaction."
-                     << ", the size of rowsets to compact=" << candidate_rowsets.size()
+                     << " The size of rowsets to compact=" << candidate_rowsets.size()
                      << ", cumulative_point=" << _tablet->cumulative_layer_point();
         return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
     }
diff --git a/be/src/olap/olap_header.h b/be/src/olap/olap_header.h
deleted file mode 100644
index daaa9a0..0000000
--- a/be/src/olap/olap_header.h
+++ /dev/null
@@ -1,156 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_OLAP_OLAP_HEADER_H
-#define DORIS_BE_SRC_OLAP_OLAP_HEADER_H
-
-#include <list>
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-#include "gen_cpp/olap_file.pb.h"
-#include "gen_cpp/Types_types.h"
-#include "olap/olap_common.h"
-#include "olap/olap_define.h"
-
-namespace doris {
-// Class for managing olap table header.
-class OLAPHeader : public OLAPHeaderMessage {
-public:
-    explicit OLAPHeader() :
-            _support_reverse_version(false) {}
-
-    // for compatible header file
-    explicit OLAPHeader(const std::string& file_name) :
-            _file_name(file_name),
-            _support_reverse_version(false) {}
-
-    virtual ~OLAPHeader();
-
-    // Loads the header from disk and init, returning true on success.
-    // In load_and_init(), we will validate olap header file, which mainly include
-    // tablet schema, delta version and so on.
-    OLAPStatus load_and_init();
-    OLAPStatus load_for_check();
-
-    // Saves the header to disk, returning true on success.
-    OLAPStatus save();
-    OLAPStatus save(const std::string& file_path);
-
-    OLAPStatus init();
-
-    // Return the file name of the heade.
-    std::string file_name() const {
-        return _file_name;
-    }
-
-    // Adds a new version to the header. Do not use the proto's
-    // add_version() directly.
-    OLAPStatus add_version(Version version, VersionHash version_hash,
-                           int32_t segment_group_id, int32_t num_segments,
-                           int64_t index_size, int64_t data_size, int64_t num_rows,
-                           bool empty, const std::vector<KeyRange>* column_statistics);
-
-    OLAPStatus add_pending_version(int64_t partition_id, int64_t transaction_id,
-                                 const std::vector<std::string>* delete_conditions);
-    OLAPStatus add_pending_segment_group(int64_t transaction_id, int32_t num_segments,
-                                  int32_t pending_segment_group_id, const PUniqueId& load_id,
-                                  bool empty, const std::vector<KeyRange>* column_statistics);
-
-    // add incremental segment_group into header like "9-9" "10-10", for incremental cloning
-    OLAPStatus add_incremental_version(Version version, VersionHash version_hash,
-                                       int32_t segment_group_id, int32_t num_segments,
-                                       int64_t index_size, int64_t data_size, int64_t num_rows,
-                                       bool empty, const std::vector<KeyRange>* column_statistics);
-
-    void add_delete_condition(const DeleteConditionMessage& delete_condition, int64_t version);
-    void delete_cond_by_version(const Version& version);
-    bool is_delete_data_version(Version version);
-
-    const PPendingDelta* get_pending_delta(int64_t transaction_id) const;
-    const PPendingSegmentGroup* get_pending_segment_group(int64_t transaction_id, int32_t pending_segment_group_id) const;
-    const PDelta* get_incremental_version(Version version) const;
-
-    // Deletes a version from the header.
-    OLAPStatus delete_version(Version version);
-    OLAPStatus delete_all_versions();
-    void delete_pending_delta(int64_t transaction_id);
-    void delete_incremental_delta(Version version);
-
-    // Constructs a canonical file name (without path) for the header.
-    // eg "DailyUnitStats_PRIMARY.hdr"
-    std::string construct_file_name() const {
-        return std::string(basename(_file_name.c_str()));
-    }
-
-    // In order to prevent reverse version to appear in the shortest version
-    // path, you can call set_reverse_version(false) although schema can
-    // support reverse version in the path.
-    void set_reverse_version(bool support_reverse_version) {
-        _support_reverse_version = support_reverse_version;
-    }
-
-    // Try to select the least number of data files that can span the
-    // target_version and append these data versions to the span_versions.
-    // Return false if the target_version cannot be spanned.
-    virtual OLAPStatus select_versions_to_span(const Version& target_version,
-                                           std::vector<Version>* span_versions);
-
-    const PDelta* get_lastest_delta_version() const;
-    const PDelta* get_lastest_version() const;
-    Version get_latest_version() const;
-    const PDelta* get_delta(int index) const;
-    const PDelta* get_base_version() const;
-    const uint32_t get_cumulative_compaction_score() const;
-    const uint32_t get_base_compaction_score() const;
-    const OLAPStatus version_creation_time(const Version& version, int64_t* creation_time) const;
-
-    int file_delta_size() const {
-        return delta_size();
-    }
-    void change_file_version_to_delta();
-private:
-    void _convert_file_version_to_delta(const FileVersionMessage& version, PDelta* delta);
-
-    // full path of olap header file
-    std::string _file_name;
-
-    // If the aggregation types of all value columns in the schema are SUM,
-    // select_versions_to_span can return reverse version in the shortest
-    // version path. one can set _support_reverse_version to be false in
-    // order to prevent reverse version to appear in the shortest version path.
-    // Its default value is false.
-    bool _support_reverse_version;
-
-    // OLAP version contains two parts, [start_version, end_version]. In order
-    // to construct graph, the OLAP version has two corresponding vertex, one
-    // vertex's value is version.start_version, the other is
-    // version.end_version + 1.
-    // Use adjacency list to describe version graph.
-    std::vector<Vertex> _version_graph;
-
-    // vertex value --> vertex_index of _version_graph
-    // It is easy to find vertex index according to vertex value.
-    std::unordered_map<int, int> _vertex_helper_map;
-    
-    DISALLOW_COPY_AND_ASSIGN(OLAPHeader);
-};
-
-}  // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_OLAP_HEADER_H
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b3d6694..cde5b89 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -609,6 +609,9 @@ const uint32_t Tablet::calc_cumulative_compaction_score() const {
         if (rs_meta->start_version() >= point) {
             score++;
         }
+        if (rs_meta->start_version() == rs_meta->end_version()) {
+            score += rs_meta->num_segments();
+        }
         if (rs_meta->start_version() == 0) {
             base_rowset_exist = true;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org