You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by we...@apache.org on 2022/06/21 12:55:33 UTC

[doris] branch memtable_opt_rebase_bak created (now c385442d79)

This is an automated email from the ASF dual-hosted git repository.

weixiang pushed a change to branch memtable_opt_rebase_bak
in repository https://gitbox.apache.org/repos/asf/doris.git


      at c385442d79 Revert "[wx-opt-mem-feature] add profile in memtable"

This branch includes the following new commits:

     new 0d3880b415 [feature-wip](stream-load-vec) opt memtable
     new bf98cd8c7e [wx-opt-mem-feature] add profile in memtable
     new 93c2604c1f [wx-sr-fix] fix core dump when _block is empty after call to_block
     new 6d0fc5680f [wx-sr-opt] use pdqsort to replace std::sort & apply clang-format
     new c385442d79 Revert "[wx-opt-mem-feature] add profile in memtable"

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/05: [wx-sr-fix] fix core dump when _block is empty after call to_block

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

weixiang pushed a commit to branch memtable_opt_rebase_bak
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 93c2604c1f7a9a3cdf11e9efabb23c158738a03d
Author: weixiang <we...@meituan.com>
AuthorDate: Fri May 13 00:23:27 2022 +0800

    [wx-sr-fix] fix core dump when _block is empty after call to_block
---
 be/src/olap/memtable.cpp  |  4 +---
 be/src/vec/core/block.cpp | 18 ++++++++++++++++++
 be/src/vec/core/block.h   |  2 ++
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 7f431a7289..2c8f2a6a15 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -225,9 +225,7 @@ void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::M
     for (size_t i = 0; i < row_num; i++) {
         _sorted_index_in_block.push_back(_index_for_sort[i].index_in_block);
     }
-    vectorized::Block src_block = src->to_block();
-    dst->add_rows(&src_block, _sorted_index_in_block.data(),
-                  _sorted_index_in_block.data() + row_num);
+    dst->add_rows(src, _sorted_index_in_block.data(), _sorted_index_in_block.data() + row_num);
 }
 
 void MemTable::finalize() {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8efc0e0e1b..0a23a729b2 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -885,6 +885,24 @@ void MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length)
     }
 }
 
+void MutableBlock::add_rows(MutableBlock* block, const int* row_begin, const int* row_end) {
+    for (size_t i = 0; i < _columns.size(); ++i) {
+        auto& dst = _columns[i];
+        auto& src = *(block->get_column_by_position(i).get());
+        dst->insert_indices_from(src, row_begin, row_end);
+    }
+}
+
+void MutableBlock::add_rows(MutableBlock* block, size_t row_begin, size_t length) {
+    for (size_t i = 0; i < _columns.size(); ++i) {
+        auto& dst = _columns[i];
+        auto& src = *(block->get_column_by_position(i).get());
+        dst->insert_range_from(src, row_begin, length);
+    }
+}
+
+
+
 Block MutableBlock::to_block(int start_column) {
     return to_block(start_column, _columns.size());
 }
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 7a6f61deb9..144446e255 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -444,6 +444,8 @@ public:
     void add_row(const Block* block, int row);
     void add_rows(const Block* block, const int* row_begin, const int* row_end);
     void add_rows(const Block* block, size_t row_begin, size_t length);
+    void add_rows(MutableBlock* block, const int* row_begin, const int* row_end);
+    void add_rows(MutableBlock* block, size_t row_begin, size_t length);
 
     std::string dump_data(size_t row_limit = 100) const;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/05: [wx-opt-mem-feature] add profile in memtable

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

weixiang pushed a commit to branch memtable_opt_rebase_bak
in repository https://gitbox.apache.org/repos/asf/doris.git

commit bf98cd8c7efa74242970d421b1174b8eaec85059
Author: weixiang <we...@meituan.com>
AuthorDate: Fri Apr 29 18:04:35 2022 +0800

    [wx-opt-mem-feature] add profile in memtable
---
 be/src/olap/memtable.cpp | 151 ++++++++++++++++++++++++++---------------------
 be/src/olap/memtable.h   |  24 ++++++++
 2 files changed, 107 insertions(+), 68 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index e1c97f2d4d..7f431a7289 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -55,6 +55,7 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
                 _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
         _block_aggregator =
                 std::make_unique<vectorized::BlockAggregator>(_schema, _tablet_schema, true);
+        _init_profile();
     } else {
         _vec_skip_list = nullptr;
         if (_keys_type == KeysType::DUP_KEYS) {
@@ -100,6 +101,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
 MemTable::~MemTable() {
     std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
     _mem_tracker->release(_mem_usage);
+    print_profile();
 }
 
 MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {}
@@ -117,31 +119,35 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
 }
 
 bool MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
-    if (_is_first_insertion) {
-        _is_first_insertion = false;
-        auto cloneBlock = block->clone_without_columns();
-        _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock);
-        if (_keys_type != KeysType::DUP_KEYS) {
-            _init_agg_functions(block);
+    {
+        SCOPED_TIMER(_insert_time);
+        if (_is_first_insertion) {
+            _is_first_insertion = false;
+            auto cloneBlock = block->clone_without_columns();
+            _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock);
+            if (_keys_type != KeysType::DUP_KEYS) {
+                _init_agg_functions(block);
+            }
         }
-    }
-    _block->add_rows(block, row_pos, num_rows);
-    _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows();
-    // Memtalbe is full, do not flush immediately
-    // First try to merge these blocks
-    // If the merged memtable is still full or we can not benefit a lot from merge at first
-    // Then flush the memtable into disk.
-    bool is_flush = false;
-    if (is_full()) {
-        size_t before_merge_bytes = bytes_allocated();
-        _merge();
-        size_t after_merged_bytes = bytes_allocated();
-        // TODO(weixiang): magic number here, make it configurable later.
-        if (is_full() || (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) {
-            is_flush = true;
+        _block->add_rows(block, row_pos, num_rows);
+        _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows();
+        // Memtalbe is full, do not flush immediately
+        // First try to merge these blocks
+        // If the merged memtable is still full or we can not benefit a lot from merge at first
+        // Then flush the memtable into disk.
+        bool is_flush = false;
+        if (is_full()) {
+            size_t before_merge_bytes = bytes_allocated();
+            _merge();
+            size_t after_merged_bytes = bytes_allocated();
+            // TODO(weixiang): magic number here, make it configurable later.
+            if (is_full() ||
+                (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) {
+                is_flush = true;
+            }
         }
+        return is_flush;
     }
-    return is_flush;
 }
 
 size_t MemTable::bytes_allocated() const {
@@ -162,35 +168,41 @@ void MemTable::_merge() {
 }
 
 void MemTable::_agg(const bool finalize) {
-    // note that the _block had been sorted before.
-    if (_sorted_block == nullptr || _sorted_block->rows() <= 0) {
-        return;
-    }
-    vectorized::Block sorted_block = _sorted_block->to_block();
-    _block_aggregator->append_block(&sorted_block);
-    _block_aggregator->partial_sort_merged_aggregate();
-    if (finalize) {
-        _sorted_block.reset();
-    } else {
-        _sorted_block->clear_column_data();
+    {
+        SCOPED_TIMER(_agg_time);
+        // note that the _block had been sorted before.
+        if (_sorted_block == nullptr || _sorted_block->rows() <= 0) {
+            return;
+        }
+        vectorized::Block sorted_block = _sorted_block->to_block();
+        _block_aggregator->append_block(&sorted_block);
+        _block_aggregator->partial_sort_merged_aggregate();
+        if (finalize) {
+            _sorted_block.reset();
+        } else {
+            _sorted_block->clear_column_data();
+        }
     }
 }
 
 void MemTable::_sort(const bool finalize) {
-    _index_for_sort.resize(_block->rows());
-    for (uint32_t i = 0; i < _block->rows(); i++) {
-        _index_for_sort[i] = {i, i};
-    }
+    {
+        SCOPED_TIMER(_sort_time);
+        _index_for_sort.resize(_block->rows());
+        for (uint32_t i = 0; i < _block->rows(); i++) {
+            _index_for_sort[i] = {i, i};
+        }
 
-    _sort_block_by_rows();
-    _sorted_block = _block->create_same_struct_block(0);
-    _append_sorted_block(_block.get(), _sorted_block.get());
-    if (finalize) {
-        _block.reset();
-    } else {
-        _block->clear_column_data();
+        _sort_block_by_rows();
+        _sorted_block = _block->create_same_struct_block(0);
+        _append_sorted_block(_block.get(), _sorted_block.get());
+        if (finalize) {
+            _block.reset();
+        } else {
+            _block->clear_column_data();
+        }
+        _block_bytes_usage = 0;
     }
-    _block_bytes_usage = 0;
 }
 
 void MemTable::_sort_block_by_rows() {
@@ -219,32 +231,35 @@ void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::M
 }
 
 void MemTable::finalize() {
-    //TODO(weixiang): check here
-    if (_block == nullptr) {
-        return;
-    }
-
-    if (_keys_type != KeysType::DUP_KEYS) {
-        // agg mode
-        if (_block->rows() > 0) {
-            _merge();
-        }
-        if (_merge_count > 1) {
-            _block = _block_aggregator->get_partial_agged_block();
-            _block_aggregator->reset_aggregator();
-            _sort(true);
-            _agg(true);
-        } else {
-            _block.reset();
-            _sorted_block.reset();
+    {
+        SCOPED_TIMER(_finalize_time);
+        //TODO(weixiang): check here
+        if (_block == nullptr) {
+            return;
         }
 
-        _block_bytes_usage = 0;
-        _sorted_block = _block_aggregator->get_partial_agged_block();
+        if (_keys_type != KeysType::DUP_KEYS) {
+            // agg mode
+            if (_block->rows() > 0) {
+                _merge();
+            }
+            if (_merge_count > 1) {
+                _block = _block_aggregator->get_partial_agged_block();
+                _block_aggregator->reset_aggregator();
+                _sort(true);
+                _agg(true);
+            } else {
+                _block.reset();
+                _sorted_block.reset();
+            }
 
-    } else {
-        // dup mode
-        _sort(true);
+            _block_bytes_usage = 0;
+            _sorted_block = _block_aggregator->get_partial_agged_block();
+
+        } else {
+            // dup mode
+            _sort(true);
+        }
     }
 }
 
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index daca1f71fc..3df94c6f97 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -23,6 +23,7 @@
 #include "olap/olap_define.h"
 #include "olap/skiplist.h"
 #include "runtime/mem_tracker.h"
+#include "util/runtime_profile.h"
 #include "util/tuple_row_zorder_compare.h"
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/common/string_ref.h"
@@ -162,6 +163,21 @@ private:
 
     void _append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst);
 
+    void _init_profile() {
+        _profile.reset(new RuntimeProfile("Memtable"));
+        _insert_time = ADD_TIMER(_profile, "insert time");
+        _sort_time = ADD_TIMER(_profile, "sort time");
+        _agg_time = ADD_TIMER(_profile, "agg time");
+        _finalize_time = ADD_TIMER(_profile, "finalize time");
+    }
+
+    void print_profile() {
+        std::stringstream ss;
+        _profile->pretty_print(&ss);
+        LOG(INFO) << ss.str();
+    }
+
+
     int64_t _tablet_id;
     Schema* _schema;
     const TabletSchema* _tablet_schema;
@@ -239,6 +255,14 @@ private:
     size_t _block_bytes_usage = 0;
     size_t _agg_bytes_usage = 0;
     int _merge_count = 0;
+
+    std::unique_ptr<RuntimeProfile> _profile;
+    RuntimeProfile::Counter* _insert_time;
+    RuntimeProfile::Counter* _sort_time;
+    RuntimeProfile::Counter* _agg_time;
+    RuntimeProfile::Counter* _finalize_time;
+
+
 }; // class MemTable
 
 inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/05: [feature-wip](stream-load-vec) opt memtable

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

weixiang pushed a commit to branch memtable_opt_rebase_bak
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0d3880b41586fe8f159a3c8ab8a395f65cfe0ef7
Author: weixiang <we...@meituan.com>
AuthorDate: Mon Apr 18 21:37:34 2022 +0800

    [feature-wip](stream-load-vec) opt memtable
---
 be/src/olap/delta_writer.cpp                       |   7 +-
 be/src/olap/memtable.cpp                           | 147 +++++++++++++++--
 be/src/olap/memtable.h                             |  43 ++++-
 be/src/vec/CMakeLists.txt                          |   1 +
 .../vec/aggregate_functions/block_aggregator.cpp   | 180 +++++++++++++++++++++
 be/src/vec/aggregate_functions/block_aggregator.h  |  70 ++++++++
 be/src/vec/core/block.cpp                          |  20 +++
 be/src/vec/core/block.h                            |  13 ++
 8 files changed, 461 insertions(+), 20 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 11ee242449..5cf0c9cd22 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -191,11 +191,14 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
     }
 
     int start = 0, end = 0;
+    bool flush = false;
     const size_t num_rows = row_idxs.size();
     for (; start < num_rows;) {
         auto count = end + 1 - start;
         if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) {
-            _mem_table->insert(block, row_idxs[start], count);
+            if (_mem_table->insert(block, row_idxs[start], count)) {
+                flush = true;
+            }
             start += count;
             end = start;
         } else {
@@ -203,7 +206,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
         }
     }
 
-    if (_mem_table->memory_usage() >= config::write_buffer_size) {
+    if (flush || _mem_table->is_full()) {
         RETURN_NOT_OK(_flush_memtable_async());
         _reset_mem_table();
     }
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index c7f94ead4f..e1c97f2d4d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -53,6 +53,8 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
         // TODO: Support ZOrderComparator in the future
         _vec_skip_list = std::make_unique<VecTable>(
                 _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
+        _block_aggregator =
+                std::make_unique<vectorized::BlockAggregator>(_schema, _tablet_schema, true);
     } else {
         _vec_skip_list = nullptr;
         if (_keys_type == KeysType::DUP_KEYS) {
@@ -114,27 +116,135 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
                                *_pblock, -1);
 }
 
-void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
+bool MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
     if (_is_first_insertion) {
         _is_first_insertion = false;
         auto cloneBlock = block->clone_without_columns();
-        _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
-        _vec_row_comparator->set_block(&_input_mutable_block);
-        _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+        _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock);
         if (_keys_type != KeysType::DUP_KEYS) {
             _init_agg_functions(block);
         }
     }
-    size_t cursor_in_mutableblock = _input_mutable_block.rows();
-    size_t oldsize = _input_mutable_block.allocated_bytes();
-    _input_mutable_block.add_rows(block, row_pos, num_rows);
-    size_t newsize = _input_mutable_block.allocated_bytes();
-    _mem_usage += newsize - oldsize;
-    _mem_tracker->consume(newsize - oldsize);
-
-    for (int i = 0; i < num_rows; i++) {
-        _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
-        _insert_one_row_from_block(_row_in_blocks.back());
+    _block->add_rows(block, row_pos, num_rows);
+    _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows();
+    // Memtalbe is full, do not flush immediately
+    // First try to merge these blocks
+    // If the merged memtable is still full or we can not benefit a lot from merge at first
+    // Then flush the memtable into disk.
+    bool is_flush = false;
+    if (is_full()) {
+        size_t before_merge_bytes = bytes_allocated();
+        _merge();
+        size_t after_merged_bytes = bytes_allocated();
+        // TODO(weixiang): magic number here, make it configurable later.
+        if (is_full() || (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) {
+            is_flush = true;
+        }
+    }
+    return is_flush;
+}
+
+size_t MemTable::bytes_allocated() const {
+    return _block_bytes_usage + _block_aggregator->get_bytes_usage();
+}
+
+bool MemTable::is_full() const {
+    return bytes_allocated() > config::write_buffer_size;
+}
+
+void MemTable::_merge() {
+    if (_block == nullptr || _keys_type == KeysType::DUP_KEYS) {
+        return;
+    }
+    _sort(false);
+    _agg(false);
+    _merge_count++;
+}
+
+void MemTable::_agg(const bool finalize) {
+    // note that the _block had been sorted before.
+    if (_sorted_block == nullptr || _sorted_block->rows() <= 0) {
+        return;
+    }
+    vectorized::Block sorted_block = _sorted_block->to_block();
+    _block_aggregator->append_block(&sorted_block);
+    _block_aggregator->partial_sort_merged_aggregate();
+    if (finalize) {
+        _sorted_block.reset();
+    } else {
+        _sorted_block->clear_column_data();
+    }
+}
+
+void MemTable::_sort(const bool finalize) {
+    _index_for_sort.resize(_block->rows());
+    for (uint32_t i = 0; i < _block->rows(); i++) {
+        _index_for_sort[i] = {i, i};
+    }
+
+    _sort_block_by_rows();
+    _sorted_block = _block->create_same_struct_block(0);
+    _append_sorted_block(_block.get(), _sorted_block.get());
+    if (finalize) {
+        _block.reset();
+    } else {
+        _block->clear_column_data();
+    }
+    _block_bytes_usage = 0;
+}
+
+void MemTable::_sort_block_by_rows() {
+    std::sort(_index_for_sort.begin(), _index_for_sort.end(),
+              [this](const MemTable::OrderedIndexItem& left,
+                     const MemTable::OrderedIndexItem& right) {
+                  int res = _block->compare_at(left.index_in_block, right.index_in_block,
+                                               _schema->num_key_columns(), *_block.get(), -1);
+                  if (res != 0) {
+                      return res < 0;
+                  }
+                  return left.incoming_index < right.incoming_index;
+              });
+}
+
+void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst) {
+    size_t row_num = src->rows();
+    _sorted_index_in_block.clear();
+    _sorted_index_in_block.reserve(row_num);
+    for (size_t i = 0; i < row_num; i++) {
+        _sorted_index_in_block.push_back(_index_for_sort[i].index_in_block);
+    }
+    vectorized::Block src_block = src->to_block();
+    dst->add_rows(&src_block, _sorted_index_in_block.data(),
+                  _sorted_index_in_block.data() + row_num);
+}
+
+void MemTable::finalize() {
+    //TODO(weixiang): check here
+    if (_block == nullptr) {
+        return;
+    }
+
+    if (_keys_type != KeysType::DUP_KEYS) {
+        // agg mode
+        if (_block->rows() > 0) {
+            _merge();
+        }
+        if (_merge_count > 1) {
+            _block = _block_aggregator->get_partial_agged_block();
+            _block_aggregator->reset_aggregator();
+            _sort(true);
+            _agg(true);
+        } else {
+            _block.reset();
+            _sorted_block.reset();
+        }
+
+        _block_bytes_usage = 0;
+        _sorted_block = _block_aggregator->get_partial_agged_block();
+
+    } else {
+        // dup mode
+        _sort(true);
     }
 }
 
@@ -271,6 +381,13 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
 }
 
 Status MemTable::flush() {
+    if (!_skip_list) {
+        finalize();
+        if (_sorted_block == nullptr) {
+            return Status::OK();
+        }
+    }
+
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
                   << ", memsize: " << memory_usage() << ", rows: " << _rows;
     int64_t duration_ns = 0;
@@ -301,7 +418,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
             RETURN_NOT_OK(st);
         }
     } else {
-        vectorized::Block block = _collect_vskiplist_results();
+        vectorized::Block block = _sorted_block->to_block();
         // beta rowset flush parallel, segment write add block is not
         // thread safe, so use tmp variable segment_write instead of
         // member variable
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 910cd92270..daca1f71fc 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -27,6 +27,7 @@
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h"
+#include "vec/aggregate_functions/block_aggregator.h"
 
 namespace doris {
 
@@ -48,11 +49,17 @@ public:
 
     int64_t tablet_id() const { return _tablet_id; }
     size_t memory_usage() const { return _mem_tracker->consumption(); }
+
     std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; }
 
-    inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
-    // insert tuple from (row_pos) to (row_pos+num_rows)
-    void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+    inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); };
+    //insert tuple from (row_pos) to (row_pos+num_rows)
+    bool insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+
+    bool is_full() const;
+    size_t bytes_allocated() const;
+
+    void finalize();
 
     /// Flush
     Status flush();
@@ -146,6 +153,14 @@ private:
     // for vectorized
     void _insert_one_row_from_block(RowInBlock* row_in_block);
     void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
+    void _sort(const bool finalize);
+    void _sort_block_by_rows();
+
+    void _merge();
+
+    void _agg(const bool finalize);
+
+    void _append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst);
 
     int64_t _tablet_id;
     Schema* _schema;
@@ -195,13 +210,35 @@ private:
     //for vectorized
     vectorized::MutableBlock _input_mutable_block;
     vectorized::MutableBlock _output_mutable_block;
+
+    struct OrderedIndexItem {
+        uint32_t index_in_block;
+        uint32_t incoming_index; // used for sort by column
+    };
+
+    using OrderedIndex = std::vector<OrderedIndexItem>;
+
+    OrderedIndex _index_for_sort;
+
+    std::vector<int> _sorted_index_in_block;
+
+    vectorized::MutableBlockPtr _block;
+
+    vectorized::MutableBlockPtr _sorted_block;
+
+    std::unique_ptr<vectorized::BlockAggregator> _block_aggregator;
+
     vectorized::Block _collect_vskiplist_results();
+
     bool _is_first_insertion;
 
     void _init_agg_functions(const vectorized::Block* block);
     std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
     std::vector<RowInBlock*> _row_in_blocks;
     size_t _mem_usage;
+    size_t _block_bytes_usage = 0;
+    size_t _agg_bytes_usage = 0;
+    int _merge_count = 0;
 }; // class MemTable
 
 inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 265d6fd884..ca768bbe3d 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -39,6 +39,7 @@ set(VEC_FILES
   aggregate_functions/aggregate_function_group_concat.cpp
   aggregate_functions/aggregate_function_percentile_approx.cpp
   aggregate_functions/aggregate_function_simple_factory.cpp
+  aggregate_functions/block_aggregator.cpp
   columns/collator.cpp
   columns/column.cpp
   columns/column_array.cpp
diff --git a/be/src/vec/aggregate_functions/block_aggregator.cpp b/be/src/vec/aggregate_functions/block_aggregator.cpp
new file mode 100644
index 0000000000..201cca75e0
--- /dev/null
+++ b/be/src/vec/aggregate_functions/block_aggregator.cpp
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "block_aggregator.h"
+
+namespace doris::vectorized {
+
+BlockAggregator::BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted)
+        : _schema(schema), _tablet_schema(tablet_schema), _src_sorted(src_sorted) {
+    _init_agg_functions();
+}
+
+BlockAggregator::~BlockAggregator() {
+}
+
+void BlockAggregator::_init_agg_functions() {
+    _cols_num = _schema->num_columns();
+    _key_cols_num = _schema->num_key_columns();
+    _value_cols_num = _cols_num - _key_cols_num;
+    //TODO(weixiang): save memory just use value length.
+    _agg_functions.resize(_schema->num_columns());
+    _agg_places.resize(_value_cols_num);
+    for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
+        FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation();
+        std::string agg_name = TabletColumn::get_string_by_aggregation_type(agg_method) + AGG_LOAD_SUFFIX;
+
+        std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
+                       [](unsigned char c) { return std::tolower(c); });
+
+        // create aggregate function
+        DataTypes argument_types;
+        // TODO(weixiang): 检查这块这么写是否有隐患
+        DataTypePtr dtptr = Schema::get_data_type_ptr(*_schema->column(cid));
+        argument_types.push_back(dtptr);
+        Array params;
+        AggregateFunctionPtr function =
+                AggregateFunctionSimpleFactory::instance().get(
+                        agg_name, argument_types, params, dtptr->is_nullable());
+
+        DCHECK(function != nullptr);
+        _agg_functions[cid] = function;
+    }
+}
+
+
+void BlockAggregator::append_block(Block* block) {
+    if (block == nullptr || block->rows() <= 0){
+        return;
+    }
+    _agg_data_counters.reserve(_agg_data_counters.size() + block->rows());
+    size_t key_num = _schema->num_key_columns();
+
+    size_t same_rows = 1;
+    for (size_t i = 0; i < block->rows(); i++) {
+        if ( i+1 == block->rows() || block->compare_at(i, i+1, key_num, *block, -1) != 0) {
+            _agg_data_counters.push_back(same_rows);
+            same_rows = 0;
+        }
+        same_rows++;
+    }
+    if (_is_first_append) {
+        // this means it is appending block for the first time
+        _aggregated_block = std::make_shared<MutableBlock>(block);
+        _is_first_append = false;
+    } else {
+        _aggregated_block->add_rows(block, 0, block->rows());
+    }
+}
+
+/**
+ * @brief aggregate sorted block
+ * 1. _agg_data_counters save the following N rows to agg in partial sort block
+ * 2. first_row_idx records the first row num of rows with the same keys.
+ *
+ * 
+ * TODO(weixiang):
+ *  1. refactor function partial_sort_merged_aggregate, 拆成多个函数:init等
+ */
+
+void BlockAggregator::partial_sort_merged_aggregate() {
+    DCHECK(!_agg_data_counters.empty());
+    std::vector<int> first_row_idx; // TODO(weixiang): add into member variables
+    std::vector<MutableColumnPtr> aggregated_cols;
+    first_row_idx.reserve(_agg_data_counters.size());
+    int row_pos = _cumulative_agg_num;
+    for (size_t i = 0; i < _agg_data_counters.size(); i++) {
+        first_row_idx.push_back(row_pos);
+        row_pos += _agg_data_counters[i];    
+    }
+    auto col_ids = _schema->column_ids();
+    size_t agged_row_num = first_row_idx.size();
+    // for keys:
+    for (size_t cid = 0; cid < _key_cols_num; cid++) {
+        
+        MutableColumnPtr key_col =
+                _schema->get_data_type_ptr(*_schema->column(col_ids[cid]))->create_column();
+        key_col->insert_indices_from(*_aggregated_block->mutable_columns()[cid],
+                                     first_row_idx.data(),
+                                     first_row_idx.data() + agged_row_num);
+        aggregated_cols.emplace_back(std::move(key_col));
+    }
+
+    // init agged place for values:
+    for (size_t cid = _key_cols_num; cid < _cols_num; cid++) {
+        size_t place_size = _agg_functions[cid]->size_of_data();
+        _agg_places[cid - _key_cols_num] = new char[place_size * agged_row_num];
+        for (auto i = 0; i < agged_row_num; i++) {
+           AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i;
+           _agg_functions[cid]->create(place);
+        }
+        
+    }
+    
+    // do agg
+    for (size_t cid = _key_cols_num; cid < _cols_num; cid++) {
+        size_t place_size = _agg_functions[cid]->size_of_data();
+        auto* src_value_col_ptr = _aggregated_block->mutable_columns()[cid].get();
+        size_t agg_begin_idx = 0;
+        
+        for (size_t i = 0; i < agged_row_num; i++) {
+            AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i;
+            _agg_functions[cid]->add_batch_range(
+                    agg_begin_idx,
+                    agg_begin_idx + _agg_data_counters[i] - 1, place,
+                    const_cast<const doris::vectorized::IColumn**>(&src_value_col_ptr), nullptr);
+            agg_begin_idx += _agg_data_counters[i];
+        }
+    }
+
+    // move to result column
+    for (size_t value_col_idx = 0; value_col_idx < _value_cols_num; value_col_idx++) {
+        size_t place_size = _agg_functions[value_col_idx + _key_cols_num]->size_of_data();
+        MutableColumnPtr dst_value_col_ptr =
+                _schema->get_data_type_ptr(*_schema->column(col_ids[value_col_idx + _key_cols_num]))
+                        ->create_column();
+        for (size_t i = 0; i < first_row_idx.size(); i++) {
+            _agg_functions[value_col_idx + _key_cols_num]->insert_result_into(
+                    _agg_places[value_col_idx] + i * place_size,
+                    *reinterpret_cast<doris::vectorized::IColumn*>(dst_value_col_ptr.get()));
+        }
+        aggregated_cols.emplace_back(std::move(dst_value_col_ptr));
+    }
+
+    _aggregated_block->clear_column_data();
+    _aggregated_block->append_from_columns(aggregated_cols, agged_row_num);
+    _agg_data_counters.clear();
+    _cumulative_agg_num += agged_row_num;
+
+    for(auto place : _agg_places) {
+        // free aggregated memory
+        delete[] place;
+    }
+    
+
+}
+
+
+
+size_t BlockAggregator::get_bytes_usage() const{
+    if(UNLIKELY(_aggregated_block == nullptr)) {
+        return 0;
+    }
+    return _aggregated_block->allocated_bytes();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/block_aggregator.h b/be/src/vec/aggregate_functions/block_aggregator.h
new file mode 100644
index 0000000000..510e614dd2
--- /dev/null
+++ b/be/src/vec/aggregate_functions/block_aggregator.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#pragma once
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/core/block.h"
+#include "olap/schema.h"
+
+namespace doris::vectorized {
+
+using BlockPtr = std::shared_ptr<Block>;
+using MutableBlockPtr = std::shared_ptr<MutableBlock>;
+
+class BlockAggregator {
+
+
+public:
+    BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted);
+    ~BlockAggregator();
+    void append_block(Block* block);
+    void partial_sort_merged_aggregate();
+    void _init_agg_functions();
+    size_t get_bytes_usage() const;
+
+    MutableBlockPtr get_partial_agged_block() {
+        return _aggregated_block;
+    }
+
+    void reset_aggregator() {
+        _aggregated_block.reset();
+        _agg_data_counters.clear();
+        _cumulative_agg_num = 0;
+        _is_first_append = true;
+    }
+
+private:
+    bool _is_first_append = true;
+    size_t _key_cols_num;
+    size_t _value_cols_num;
+    size_t _cumulative_agg_num = 0;
+    size_t _cols_num;
+    const Schema* _schema;
+    const TabletSchema* _tablet_schema;
+    bool _src_sorted;
+    MutableBlockPtr _aggregated_block;
+    std::vector<int> _agg_data_counters;
+    std::vector<AggregateFunctionPtr> _agg_functions;
+
+    std::vector<AggregateDataPtr> _agg_places;
+    
+};
+
+} // namespace
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index aa482fcfbf..8efc0e0e1b 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -897,6 +897,14 @@ Block MutableBlock::to_block(int start_column, int end_column) {
     return {columns_with_schema};
 }
 
+void MutableBlock::clear_column_data() noexcept {
+    for (auto& col : _columns) {
+        if (col) {
+            col->clear();
+        }
+    }
+}
+
 std::string MutableBlock::dump_data(size_t row_limit) const {
     std::vector<std::string> headers;
     std::vector<size_t> headers_size;
@@ -956,6 +964,18 @@ std::unique_ptr<Block> Block::create_same_struct_block(size_t size) const {
     return temp_block;
 }
 
+//TODO(weixiang): unique_ptr?
+std::shared_ptr<MutableBlock> MutableBlock::create_same_struct_block(size_t size) const {
+    Block temp_block;
+    for (const auto& d : _data_types) {
+        auto column = d->create_column();
+        column->resize(size);
+        temp_block.insert({std::move(column), d, ""});
+    }
+    auto result = std::make_shared<MutableBlock>(std::move(temp_block));
+    return result;
+}
+
 void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) {
     for (auto idx : char_type_idx) {
         if (idx < data.size()) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 729f531291..7a6f61deb9 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -352,12 +352,25 @@ public:
     size_t rows() const;
     size_t columns() const { return _columns.size(); }
 
+
+    std::shared_ptr<MutableBlock> create_same_struct_block(size_t size) const;
+
+    void clear_column_data() noexcept;
+
     bool empty() const { return rows() == 0; }
 
     MutableColumns& mutable_columns() { return _columns; }
 
     void set_muatable_columns(MutableColumns&& columns) { _columns = std::move(columns); }
 
+    void append_from_columns(MutableColumns& columns, size_t length) {
+        DCHECK(_columns.size() == columns.size());
+        for (size_t i = 0; i < _columns.size(); i++) {
+            DCHECK(columns[i]->size() >= length);
+            _columns[i]->insert_range_from(*columns[i], 0, length);
+        }
+    }
+
     DataTypes& data_types() { return _data_types; }
 
     MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 05/05: Revert "[wx-opt-mem-feature] add profile in memtable"

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

weixiang pushed a commit to branch memtable_opt_rebase_bak
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c385442d79578808619aef5c7649bf22051be83e
Author: spaces-x <we...@gmail.com>
AuthorDate: Tue Jun 21 20:54:42 2022 +0800

    Revert "[wx-opt-mem-feature] add profile in memtable"
    
    This reverts commit bf98cd8c7efa74242970d421b1174b8eaec85059.
---
 be/src/olap/memtable.cpp | 151 +++++++++++++++++++++--------------------------
 be/src/olap/memtable.h   |  24 +-------
 2 files changed, 69 insertions(+), 106 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index eebaed0baf..b7d1514688 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -55,7 +55,6 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
                 _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
         _block_aggregator =
                 std::make_unique<vectorized::BlockAggregator>(_schema, _tablet_schema, true);
-        _init_profile();
     } else {
         _vec_skip_list = nullptr;
         if (_keys_type == KeysType::DUP_KEYS) {
@@ -101,7 +100,6 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
 MemTable::~MemTable() {
     std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
     _mem_tracker->release(_mem_usage);
-    print_profile();
 }
 
 MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {}
@@ -119,35 +117,31 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
 }
 
 bool MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
-    {
-        SCOPED_TIMER(_insert_time);
-        if (_is_first_insertion) {
-            _is_first_insertion = false;
-            auto cloneBlock = block->clone_without_columns();
-            _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock);
-            if (_keys_type != KeysType::DUP_KEYS) {
-                _init_agg_functions(block);
-            }
+    if (_is_first_insertion) {
+        _is_first_insertion = false;
+        auto cloneBlock = block->clone_without_columns();
+        _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock);
+        if (_keys_type != KeysType::DUP_KEYS) {
+            _init_agg_functions(block);
         }
-        _block->add_rows(block, row_pos, num_rows);
-        _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows();
-        // Memtalbe is full, do not flush immediately
-        // First try to merge these blocks
-        // If the merged memtable is still full or we can not benefit a lot from merge at first
-        // Then flush the memtable into disk.
-        bool is_flush = false;
-        if (is_full()) {
-            size_t before_merge_bytes = bytes_allocated();
-            _merge();
-            size_t after_merged_bytes = bytes_allocated();
-            // TODO(weixiang): magic number here, make it configurable later.
-            if (is_full() ||
-                (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) {
-                is_flush = true;
-            }
+    }
+    _block->add_rows(block, row_pos, num_rows);
+    _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows();
+    // Memtalbe is full, do not flush immediately
+    // First try to merge these blocks
+    // If the merged memtable is still full or we can not benefit a lot from merge at first
+    // Then flush the memtable into disk.
+    bool is_flush = false;
+    if (is_full()) {
+        size_t before_merge_bytes = bytes_allocated();
+        _merge();
+        size_t after_merged_bytes = bytes_allocated();
+        // TODO(weixiang): magic number here, make it configurable later.
+        if (is_full() || (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) {
+            is_flush = true;
         }
-        return is_flush;
     }
+    return is_flush;
 }
 
 size_t MemTable::bytes_allocated() const {
@@ -168,41 +162,35 @@ void MemTable::_merge() {
 }
 
 void MemTable::_agg(const bool finalize) {
-    {
-        SCOPED_TIMER(_agg_time);
-        // note that the _block had been sorted before.
-        if (_sorted_block == nullptr || _sorted_block->rows() <= 0) {
-            return;
-        }
-        vectorized::Block sorted_block = _sorted_block->to_block();
-        _block_aggregator->append_block(&sorted_block);
-        _block_aggregator->partial_sort_merged_aggregate();
-        if (finalize) {
-            _sorted_block.reset();
-        } else {
-            _sorted_block->clear_column_data();
-        }
+    // note that the _block had been sorted before.
+    if (_sorted_block == nullptr || _sorted_block->rows() <= 0) {
+        return;
+    }
+    vectorized::Block sorted_block = _sorted_block->to_block();
+    _block_aggregator->append_block(&sorted_block);
+    _block_aggregator->partial_sort_merged_aggregate();
+    if (finalize) {
+        _sorted_block.reset();
+    } else {
+        _sorted_block->clear_column_data();
     }
 }
 
 void MemTable::_sort(const bool finalize) {
-    {
-        SCOPED_TIMER(_sort_time);
-        _index_for_sort.resize(_block->rows());
-        for (uint32_t i = 0; i < _block->rows(); i++) {
-            _index_for_sort[i] = {i, i};
-        }
+    _index_for_sort.resize(_block->rows());
+    for (uint32_t i = 0; i < _block->rows(); i++) {
+        _index_for_sort[i] = {i, i};
+    }
 
-        _sort_block_by_rows();
-        _sorted_block = _block->create_same_struct_block(0);
-        _append_sorted_block(_block.get(), _sorted_block.get());
-        if (finalize) {
-            _block.reset();
-        } else {
-            _block->clear_column_data();
-        }
-        _block_bytes_usage = 0;
+    _sort_block_by_rows();
+    _sorted_block = _block->create_same_struct_block(0);
+    _append_sorted_block(_block.get(), _sorted_block.get());
+    if (finalize) {
+        _block.reset();
+    } else {
+        _block->clear_column_data();
     }
+    _block_bytes_usage = 0;
 }
 
 void MemTable::_sort_block_by_rows() {
@@ -229,35 +217,32 @@ void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::M
 }
 
 void MemTable::finalize() {
-    {
-        SCOPED_TIMER(_finalize_time);
-        //TODO(weixiang): check here
-        if (_block == nullptr) {
-            return;
-        }
-
-        if (_keys_type != KeysType::DUP_KEYS) {
-            // agg mode
-            if (_block->rows() > 0) {
-                _merge();
-            }
-            if (_merge_count > 1) {
-                _block = _block_aggregator->get_partial_agged_block();
-                _block_aggregator->reset_aggregator();
-                _sort(true);
-                _agg(true);
-            } else {
-                _block.reset();
-                _sorted_block.reset();
-            }
-
-            _block_bytes_usage = 0;
-            _sorted_block = _block_aggregator->get_partial_agged_block();
+    //TODO(weixiang): check here
+    if (_block == nullptr) {
+        return;
+    }
 
-        } else {
-            // dup mode
+    if (_keys_type != KeysType::DUP_KEYS) {
+        // agg mode
+        if (_block->rows() > 0) {
+            _merge();
+        }
+        if (_merge_count > 1) {
+            _block = _block_aggregator->get_partial_agged_block();
+            _block_aggregator->reset_aggregator();
             _sort(true);
+            _agg(true);
+        } else {
+            _block.reset();
+            _sorted_block.reset();
         }
+
+        _block_bytes_usage = 0;
+        _sorted_block = _block_aggregator->get_partial_agged_block();
+
+    } else {
+        // dup mode
+        _sort(true);
     }
 }
 
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 37e38d7b56..d7bdf3b260 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -24,12 +24,11 @@
 #include "olap/skiplist.h"
 #include "runtime/mem_tracker.h"
 #include "util/pdqsort.h"
-#include "util/runtime_profile.h"
 #include "util/tuple_row_zorder_compare.h"
 #include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/block_aggregator.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h"
-#include "vec/aggregate_functions/block_aggregator.h"
 
 namespace doris {
 
@@ -164,20 +163,6 @@ private:
 
     void _append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst);
 
-    void _init_profile() {
-        _profile.reset(new RuntimeProfile("Memtable"));
-        _insert_time = ADD_TIMER(_profile, "insert time");
-        _sort_time = ADD_TIMER(_profile, "sort time");
-        _agg_time = ADD_TIMER(_profile, "agg time");
-        _finalize_time = ADD_TIMER(_profile, "finalize time");
-    }
-
-    void print_profile() {
-        std::stringstream ss;
-        _profile->pretty_print(&ss);
-        LOG(INFO) << ss.str();
-    }
-
     int64_t _tablet_id;
     Schema* _schema;
     const TabletSchema* _tablet_schema;
@@ -255,13 +240,6 @@ private:
     size_t _block_bytes_usage = 0;
     size_t _agg_bytes_usage = 0;
     int _merge_count = 0;
-
-    std::unique_ptr<RuntimeProfile> _profile;
-    RuntimeProfile::Counter* _insert_time;
-    RuntimeProfile::Counter* _sort_time;
-    RuntimeProfile::Counter* _agg_time;
-    RuntimeProfile::Counter* _finalize_time;
-
 }; // class MemTable
 
 inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 04/05: [wx-sr-opt] use pdqsort to replace std::sort & apply clang-format

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

weixiang pushed a commit to branch memtable_opt_rebase_bak
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6d0fc5680f260da5dc1e4416d08ee67c01b56f54
Author: weixiang <we...@meituan.com>
AuthorDate: Mon May 30 15:31:54 2022 +0800

    [wx-sr-opt] use pdqsort to replace std::sort & apply clang-format
---
 be/src/olap/memtable.cpp |  20 +-
 be/src/olap/memtable.h   |   3 +-
 be/src/util/pdqsort.h    | 601 +++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 612 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2c8f2a6a15..eebaed0baf 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -206,16 +206,16 @@ void MemTable::_sort(const bool finalize) {
 }
 
 void MemTable::_sort_block_by_rows() {
-    std::sort(_index_for_sort.begin(), _index_for_sort.end(),
-              [this](const MemTable::OrderedIndexItem& left,
-                     const MemTable::OrderedIndexItem& right) {
-                  int res = _block->compare_at(left.index_in_block, right.index_in_block,
-                                               _schema->num_key_columns(), *_block.get(), -1);
-                  if (res != 0) {
-                      return res < 0;
-                  }
-                  return left.incoming_index < right.incoming_index;
-              });
+    pdqsort(_index_for_sort.begin(), _index_for_sort.end(),
+            [this](const MemTable::OrderedIndexItem& left,
+                   const MemTable::OrderedIndexItem& right) {
+                int res = _block->compare_at(left.index_in_block, right.index_in_block,
+                                             _schema->num_key_columns(), *_block.get(), -1);
+                if (res != 0) {
+                    return res < 0;
+                }
+                return left.incoming_index < right.incoming_index;
+            });
 }
 
 void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 3df94c6f97..37e38d7b56 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -23,6 +23,7 @@
 #include "olap/olap_define.h"
 #include "olap/skiplist.h"
 #include "runtime/mem_tracker.h"
+#include "util/pdqsort.h"
 #include "util/runtime_profile.h"
 #include "util/tuple_row_zorder_compare.h"
 #include "vec/aggregate_functions/aggregate_function.h"
@@ -177,7 +178,6 @@ private:
         LOG(INFO) << ss.str();
     }
 
-
     int64_t _tablet_id;
     Schema* _schema;
     const TabletSchema* _tablet_schema;
@@ -262,7 +262,6 @@ private:
     RuntimeProfile::Counter* _agg_time;
     RuntimeProfile::Counter* _finalize_time;
 
-
 }; // class MemTable
 
 inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
diff --git a/be/src/util/pdqsort.h b/be/src/util/pdqsort.h
new file mode 100644
index 0000000000..44d6e53f0e
--- /dev/null
+++ b/be/src/util/pdqsort.h
@@ -0,0 +1,601 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility methods for dealing with file paths.
+// these code is copied from https://github.com/orlp/pdqsort
+
+/*
+    pdqsort.h - Pattern-defeating quicksort.
+    Copyright (c) 2021 Orson Peters
+    This software is provided 'as-is', without any express or implied warranty. In no event will the
+    authors be held liable for any damages arising from the use of this software.
+    Permission is granted to anyone to use this software for any purpose, including commercial
+    applications, and to alter it and redistribute it freely, subject to the following restrictions:
+    1. The origin of this software must not be misrepresented; you must not claim that you wrote the
+       original software. If you use this software in a product, an acknowledgment in the product
+       documentation would be appreciated but is not required.
+    2. Altered source versions must be plainly marked as such, and must not be misrepresented as
+       being the original software.
+    3. This notice may not be removed or altered from any source distribution.
+*/
+
+#ifndef PDQSORT_H
+#define PDQSORT_H
+
+#include <algorithm>
+#include <cstddef>
+#include <functional>
+#include <utility>
+#include <iterator>
+
+#if __cplusplus >= 201103L
+#include <cstdint>
+#include <type_traits>
+#define PDQSORT_PREFER_MOVE(x) std::move(x)
+#else
+#define PDQSORT_PREFER_MOVE(x) (x)
+#endif
+
+namespace pdqsort_detail {
+enum {
+    // Partitions below this size are sorted using insertion sort.
+    insertion_sort_threshold = 24,
+
+    // Partitions above this size use Tukey's ninther to select the pivot.
+    ninther_threshold = 128,
+
+    // When we detect an already sorted partition, attempt an insertion sort that allows this
+    // amount of element moves before giving up.
+    partial_insertion_sort_limit = 8,
+
+    // Must be multiple of 8 due to loop unrolling, and < 256 to fit in unsigned char.
+    block_size = 64,
+
+    // Cacheline size, assumes power of two.
+    cacheline_size = 64
+
+};
+
+#if __cplusplus >= 201103L
+template <class T>
+struct is_default_compare : std::false_type {};
+template <class T>
+struct is_default_compare<std::less<T>> : std::true_type {};
+template <class T>
+struct is_default_compare<std::greater<T>> : std::true_type {};
+#endif
+
+// Returns floor(log2(n)), assumes n > 0.
+template <class T>
+inline int log2(T n) {
+    int log = 0;
+    while (n >>= 1) ++log;
+    return log;
+}
+
+// Sorts [begin, end) using insertion sort with the given comparison function.
+template <class Iter, class Compare>
+inline void insertion_sort(Iter begin, Iter end, Compare comp) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+    if (begin == end) return;
+
+    for (Iter cur = begin + 1; cur != end; ++cur) {
+        Iter sift = cur;
+        Iter sift_1 = cur - 1;
+
+        // Compare first so we can avoid 2 moves for an element already positioned correctly.
+        if (comp(*sift, *sift_1)) {
+            T tmp = PDQSORT_PREFER_MOVE(*sift);
+
+            do {
+                *sift-- = PDQSORT_PREFER_MOVE(*sift_1);
+            } while (sift != begin && comp(tmp, *--sift_1));
+
+            *sift = PDQSORT_PREFER_MOVE(tmp);
+        }
+    }
+}
+
+// Sorts [begin, end) using insertion sort with the given comparison function. Assumes
+// *(begin - 1) is an element smaller than or equal to any element in [begin, end).
+template <class Iter, class Compare>
+inline void unguarded_insertion_sort(Iter begin, Iter end, Compare comp) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+    if (begin == end) return;
+
+    for (Iter cur = begin + 1; cur != end; ++cur) {
+        Iter sift = cur;
+        Iter sift_1 = cur - 1;
+
+        // Compare first so we can avoid 2 moves for an element already positioned correctly.
+        if (comp(*sift, *sift_1)) {
+            T tmp = PDQSORT_PREFER_MOVE(*sift);
+
+            do {
+                *sift-- = PDQSORT_PREFER_MOVE(*sift_1);
+            } while (comp(tmp, *--sift_1));
+
+            *sift = PDQSORT_PREFER_MOVE(tmp);
+        }
+    }
+}
+
+// Attempts to use insertion sort on [begin, end). Will return false if more than
+// partial_insertion_sort_limit elements were moved, and abort sorting. Otherwise it will
+// successfully sort and return true.
+template <class Iter, class Compare>
+inline bool partial_insertion_sort(Iter begin, Iter end, Compare comp) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+    if (begin == end) return true;
+
+    std::size_t limit = 0;
+    for (Iter cur = begin + 1; cur != end; ++cur) {
+        Iter sift = cur;
+        Iter sift_1 = cur - 1;
+
+        // Compare first so we can avoid 2 moves for an element already positioned correctly.
+        if (comp(*sift, *sift_1)) {
+            T tmp = PDQSORT_PREFER_MOVE(*sift);
+
+            do {
+                *sift-- = PDQSORT_PREFER_MOVE(*sift_1);
+            } while (sift != begin && comp(tmp, *--sift_1));
+
+            *sift = PDQSORT_PREFER_MOVE(tmp);
+            limit += cur - sift;
+        }
+
+        if (limit > partial_insertion_sort_limit) return false;
+    }
+
+    return true;
+}
+
+template <class Iter, class Compare>
+inline void sort2(Iter a, Iter b, Compare comp) {
+    if (comp(*b, *a)) std::iter_swap(a, b);
+}
+
+// Sorts the elements *a, *b and *c using comparison function comp.
+template <class Iter, class Compare>
+inline void sort3(Iter a, Iter b, Iter c, Compare comp) {
+    sort2(a, b, comp);
+    sort2(b, c, comp);
+    sort2(a, b, comp);
+}
+
+template <class T>
+inline T* align_cacheline(T* p) {
+#if defined(UINTPTR_MAX) && __cplusplus >= 201103L
+    std::uintptr_t ip = reinterpret_cast<std::uintptr_t>(p);
+#else
+    std::size_t ip = reinterpret_cast<std::size_t>(p);
+#endif
+    ip = (ip + cacheline_size - 1) & -cacheline_size;
+    return reinterpret_cast<T*>(ip);
+}
+
+template <class Iter>
+inline void swap_offsets(Iter first, Iter last, unsigned char* offsets_l, unsigned char* offsets_r,
+                         size_t num, bool use_swaps) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+    if (use_swaps) {
+        // This case is needed for the descending distribution, where we need
+        // to have proper swapping for pdqsort to remain O(n).
+        for (size_t i = 0; i < num; ++i) {
+            std::iter_swap(first + offsets_l[i], last - offsets_r[i]);
+        }
+    } else if (num > 0) {
+        Iter l = first + offsets_l[0];
+        Iter r = last - offsets_r[0];
+        T tmp(PDQSORT_PREFER_MOVE(*l));
+        *l = PDQSORT_PREFER_MOVE(*r);
+        for (size_t i = 1; i < num; ++i) {
+            l = first + offsets_l[i];
+            *r = PDQSORT_PREFER_MOVE(*l);
+            r = last - offsets_r[i];
+            *l = PDQSORT_PREFER_MOVE(*r);
+        }
+        *r = PDQSORT_PREFER_MOVE(tmp);
+    }
+}
+
+// Partitions [begin, end) around pivot *begin using comparison function comp. Elements equal
+// to the pivot are put in the right-hand partition. Returns the position of the pivot after
+// partitioning and whether the passed sequence already was correctly partitioned. Assumes the
+// pivot is a median of at least 3 elements and that [begin, end) is at least
+// insertion_sort_threshold long. Uses branchless partitioning.
+template <class Iter, class Compare>
+inline std::pair<Iter, bool> partition_right_branchless(Iter begin, Iter end, Compare comp) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+
+    // Move pivot into local for speed.
+    T pivot(PDQSORT_PREFER_MOVE(*begin));
+    Iter first = begin;
+    Iter last = end;
+
+    // Find the first element greater than or equal than the pivot (the median of 3 guarantees
+    // this exists).
+    while (comp(*++first, pivot))
+        ;
+
+    // Find the first element strictly smaller than the pivot. We have to guard this search if
+    // there was no element before *first.
+    if (first - 1 == begin)
+        while (first < last && !comp(*--last, pivot))
+            ;
+    else
+        while (!comp(*--last, pivot))
+            ;
+
+    // If the first pair of elements that should be swapped to partition are the same element,
+    // the passed in sequence already was correctly partitioned.
+    bool already_partitioned = first >= last;
+    if (!already_partitioned) {
+        std::iter_swap(first, last);
+        ++first;
+
+        // The following branchless partitioning is derived from "BlockQuicksort: How Branch
+        // Mispredictions don’t affect Quicksort" by Stefan Edelkamp and Armin Weiss, but
+        // heavily micro-optimized.
+        unsigned char offsets_l_storage[block_size + cacheline_size];
+        unsigned char offsets_r_storage[block_size + cacheline_size];
+        unsigned char* offsets_l = align_cacheline(offsets_l_storage);
+        unsigned char* offsets_r = align_cacheline(offsets_r_storage);
+
+        Iter offsets_l_base = first;
+        Iter offsets_r_base = last;
+        size_t num_l, num_r, start_l, start_r;
+        num_l = num_r = start_l = start_r = 0;
+
+        while (first < last) {
+            // Fill up offset blocks with elements that are on the wrong side.
+            // First we determine how much elements are considered for each offset block.
+            size_t num_unknown = last - first;
+            size_t left_split = num_l == 0 ? (num_r == 0 ? num_unknown / 2 : num_unknown) : 0;
+            size_t right_split = num_r == 0 ? (num_unknown - left_split) : 0;
+
+            // Fill the offset blocks.
+            if (left_split >= block_size) {
+                for (size_t i = 0; i < block_size;) {
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                }
+            } else {
+                for (size_t i = 0; i < left_split;) {
+                    offsets_l[num_l] = i++;
+                    num_l += !comp(*first, pivot);
+                    ++first;
+                }
+            }
+
+            if (right_split >= block_size) {
+                for (size_t i = 0; i < block_size;) {
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                }
+            } else {
+                for (size_t i = 0; i < right_split;) {
+                    offsets_r[num_r] = ++i;
+                    num_r += comp(*--last, pivot);
+                }
+            }
+
+            // Swap elements and update block sizes and first/last boundaries.
+            size_t num = std::min(num_l, num_r);
+            swap_offsets(offsets_l_base, offsets_r_base, offsets_l + start_l, offsets_r + start_r,
+                         num, num_l == num_r);
+            num_l -= num;
+            num_r -= num;
+            start_l += num;
+            start_r += num;
+
+            if (num_l == 0) {
+                start_l = 0;
+                offsets_l_base = first;
+            }
+
+            if (num_r == 0) {
+                start_r = 0;
+                offsets_r_base = last;
+            }
+        }
+
+        // We have now fully identified [first, last)'s proper position. Swap the last elements.
+        if (num_l) {
+            offsets_l += start_l;
+            while (num_l--) std::iter_swap(offsets_l_base + offsets_l[num_l], --last);
+            first = last;
+        }
+        if (num_r) {
+            offsets_r += start_r;
+            while (num_r--) std::iter_swap(offsets_r_base - offsets_r[num_r], first), ++first;
+            last = first;
+        }
+    }
+
+    // Put the pivot in the right place.
+    Iter pivot_pos = first - 1;
+    *begin = PDQSORT_PREFER_MOVE(*pivot_pos);
+    *pivot_pos = PDQSORT_PREFER_MOVE(pivot);
+
+    return std::make_pair(pivot_pos, already_partitioned);
+}
+
+// Partitions [begin, end) around pivot *begin using comparison function comp. Elements equal
+// to the pivot are put in the right-hand partition. Returns the position of the pivot after
+// partitioning and whether the passed sequence already was correctly partitioned. Assumes the
+// pivot is a median of at least 3 elements and that [begin, end) is at least
+// insertion_sort_threshold long.
+template <class Iter, class Compare>
+inline std::pair<Iter, bool> partition_right(Iter begin, Iter end, Compare comp) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+
+    // Move pivot into local for speed.
+    T pivot(PDQSORT_PREFER_MOVE(*begin));
+
+    Iter first = begin;
+    Iter last = end;
+
+    // Find the first element greater than or equal than the pivot (the median of 3 guarantees
+    // this exists).
+    while (comp(*++first, pivot))
+        ;
+
+    // Find the first element strictly smaller than the pivot. We have to guard this search if
+    // there was no element before *first.
+    if (first - 1 == begin)
+        while (first < last && !comp(*--last, pivot))
+            ;
+    else
+        while (!comp(*--last, pivot))
+            ;
+
+    // If the first pair of elements that should be swapped to partition are the same element,
+    // the passed in sequence already was correctly partitioned.
+    bool already_partitioned = first >= last;
+
+    // Keep swapping pairs of elements that are on the wrong side of the pivot. Previously
+    // swapped pairs guard the searches, which is why the first iteration is special-cased
+    // above.
+    while (first < last) {
+        std::iter_swap(first, last);
+        while (comp(*++first, pivot))
+            ;
+        while (!comp(*--last, pivot))
+            ;
+    }
+
+    // Put the pivot in the right place.
+    Iter pivot_pos = first - 1;
+    *begin = PDQSORT_PREFER_MOVE(*pivot_pos);
+    *pivot_pos = PDQSORT_PREFER_MOVE(pivot);
+
+    return std::make_pair(pivot_pos, already_partitioned);
+}
+
+// Similar function to the one above, except elements equal to the pivot are put to the left of
+// the pivot and it doesn't check or return if the passed sequence already was partitioned.
+// Since this is rarely used (the many equal case), and in that case pdqsort already has O(n)
+// performance, no block quicksort is applied here for simplicity.
+template <class Iter, class Compare>
+inline Iter partition_left(Iter begin, Iter end, Compare comp) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+
+    T pivot(PDQSORT_PREFER_MOVE(*begin));
+    Iter first = begin;
+    Iter last = end;
+
+    while (comp(pivot, *--last))
+        ;
+
+    if (last + 1 == end)
+        while (first < last && !comp(pivot, *++first))
+            ;
+    else
+        while (!comp(pivot, *++first))
+            ;
+
+    while (first < last) {
+        std::iter_swap(first, last);
+        while (comp(pivot, *--last))
+            ;
+        while (!comp(pivot, *++first))
+            ;
+    }
+
+    Iter pivot_pos = last;
+    *begin = PDQSORT_PREFER_MOVE(*pivot_pos);
+    *pivot_pos = PDQSORT_PREFER_MOVE(pivot);
+
+    return pivot_pos;
+}
+
+template <class Iter, class Compare, bool Branchless>
+inline void pdqsort_loop(Iter begin, Iter end, Compare comp, int bad_allowed,
+                         bool leftmost = true) {
+    typedef typename std::iterator_traits<Iter>::difference_type diff_t;
+
+    // Use a while loop for tail recursion elimination.
+    while (true) {
+        diff_t size = end - begin;
+
+        // Insertion sort is faster for small arrays.
+        if (size < insertion_sort_threshold) {
+            if (leftmost)
+                insertion_sort(begin, end, comp);
+            else
+                unguarded_insertion_sort(begin, end, comp);
+            return;
+        }
+
+        // Choose pivot as median of 3 or pseudomedian of 9.
+        diff_t s2 = size / 2;
+        if (size > ninther_threshold) {
+            sort3(begin, begin + s2, end - 1, comp);
+            sort3(begin + 1, begin + (s2 - 1), end - 2, comp);
+            sort3(begin + 2, begin + (s2 + 1), end - 3, comp);
+            sort3(begin + (s2 - 1), begin + s2, begin + (s2 + 1), comp);
+            std::iter_swap(begin, begin + s2);
+        } else
+            sort3(begin + s2, begin, end - 1, comp);
+
+        // If *(begin - 1) is the end of the right partition of a previous partition operation
+        // there is no element in [begin, end) that is smaller than *(begin - 1). Then if our
+        // pivot compares equal to *(begin - 1) we change strategy, putting equal elements in
+        // the left partition, greater elements in the right partition. We do not have to
+        // recurse on the left partition, since it's sorted (all equal).
+        if (!leftmost && !comp(*(begin - 1), *begin)) {
+            begin = partition_left(begin, end, comp) + 1;
+            continue;
+        }
+
+        // Partition and get results.
+        std::pair<Iter, bool> part_result = Branchless
+                                                    ? partition_right_branchless(begin, end, comp)
+                                                    : partition_right(begin, end, comp);
+        Iter pivot_pos = part_result.first;
+        bool already_partitioned = part_result.second;
+
+        // Check for a highly unbalanced partition.
+        diff_t l_size = pivot_pos - begin;
+        diff_t r_size = end - (pivot_pos + 1);
+        bool highly_unbalanced = l_size < size / 8 || r_size < size / 8;
+
+        // If we got a highly unbalanced partition we shuffle elements to break many patterns.
+        if (highly_unbalanced) {
+            // If we had too many bad partitions, switch to heapsort to guarantee O(n log n).
+            if (--bad_allowed == 0) {
+                std::make_heap(begin, end, comp);
+                std::sort_heap(begin, end, comp);
+                return;
+            }
+
+            if (l_size >= insertion_sort_threshold) {
+                std::iter_swap(begin, begin + l_size / 4);
+                std::iter_swap(pivot_pos - 1, pivot_pos - l_size / 4);
+
+                if (l_size > ninther_threshold) {
+                    std::iter_swap(begin + 1, begin + (l_size / 4 + 1));
+                    std::iter_swap(begin + 2, begin + (l_size / 4 + 2));
+                    std::iter_swap(pivot_pos - 2, pivot_pos - (l_size / 4 + 1));
+                    std::iter_swap(pivot_pos - 3, pivot_pos - (l_size / 4 + 2));
+                }
+            }
+
+            if (r_size >= insertion_sort_threshold) {
+                std::iter_swap(pivot_pos + 1, pivot_pos + (1 + r_size / 4));
+                std::iter_swap(end - 1, end - r_size / 4);
+
+                if (r_size > ninther_threshold) {
+                    std::iter_swap(pivot_pos + 2, pivot_pos + (2 + r_size / 4));
+                    std::iter_swap(pivot_pos + 3, pivot_pos + (3 + r_size / 4));
+                    std::iter_swap(end - 2, end - (1 + r_size / 4));
+                    std::iter_swap(end - 3, end - (2 + r_size / 4));
+                }
+            }
+        } else {
+            // If we were decently balanced and we tried to sort an already partitioned
+            // sequence try to use insertion sort.
+            if (already_partitioned && partial_insertion_sort(begin, pivot_pos, comp) &&
+                partial_insertion_sort(pivot_pos + 1, end, comp))
+                return;
+        }
+
+        // Sort the left partition first using recursion and do tail recursion elimination for
+        // the right-hand partition.
+        pdqsort_loop<Iter, Compare, Branchless>(begin, pivot_pos, comp, bad_allowed, leftmost);
+        begin = pivot_pos + 1;
+        leftmost = false;
+    }
+}
+} // namespace pdqsort_detail
+
+template <class Iter, class Compare>
+inline void pdqsort(Iter begin, Iter end, Compare comp) {
+    if (begin == end) return;
+
+#if __cplusplus >= 201103L
+    pdqsort_detail::pdqsort_loop<
+            Iter, Compare,
+            pdqsort_detail::is_default_compare<typename std::decay<Compare>::type>::value &&
+                    std::is_arithmetic<typename std::iterator_traits<Iter>::value_type>::value>(
+            begin, end, comp, pdqsort_detail::log2(end - begin));
+#else
+    pdqsort_detail::pdqsort_loop<Iter, Compare, false>(begin, end, comp,
+                                                       pdqsort_detail::log2(end - begin));
+#endif
+}
+
+template <class Iter>
+inline void pdqsort(Iter begin, Iter end) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+    pdqsort(begin, end, std::less<T>());
+}
+
+template <class Iter, class Compare>
+inline void pdqsort_branchless(Iter begin, Iter end, Compare comp) {
+    if (begin == end) return;
+    pdqsort_detail::pdqsort_loop<Iter, Compare, true>(begin, end, comp,
+                                                      pdqsort_detail::log2(end - begin));
+}
+
+template <class Iter>
+inline void pdqsort_branchless(Iter begin, Iter end) {
+    typedef typename std::iterator_traits<Iter>::value_type T;
+    pdqsort_branchless(begin, end, std::less<T>());
+}
+
+#undef PDQSORT_PREFER_MOVE
+
+#endif
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org