You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/25 13:12:31 UTC

[incubator-doris] branch master updated: [stream-load-vec]: memtable flush only if necessary after aggregated (#9459)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 73e31a2179 [stream-load-vec]: memtable flush only if necessary after aggregated (#9459)
73e31a2179 is described below

commit 73e31a21799c15abde903502ad5b73138b01c336
Author: spaces-x <we...@gmail.com>
AuthorDate: Wed May 25 21:12:24 2022 +0800

    [stream-load-vec]: memtable flush only if necessary after aggregated (#9459)
    
    
    Co-authored-by: weixiang <we...@meituan.com>
---
 be/src/common/config.h       |  3 +++
 be/src/olap/delta_writer.cpp |  9 ++++---
 be/src/olap/memtable.cpp     | 56 +++++++++++++++++++++++++++++++++++---------
 be/src/olap/memtable.h       | 10 +++++++-
 be/src/vec/core/block.cpp    | 19 +++++++++++++++
 be/src/vec/core/block.h      |  7 ++++++
 6 files changed, 89 insertions(+), 15 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index fcfc2e3f78..a73efcf24d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -464,6 +464,9 @@ CONF_Int32(memory_max_alignment, "16");
 // write buffer size before flush
 CONF_mInt64(write_buffer_size, "209715200");
 
+// max buffer size used in memtable for the aggregated table
+CONF_mInt64(memtable_max_buffer_size, "419430400");
+
 // following 2 configs limit the memory consumption of load process on a Backend.
 // eg: memory limit to 80% of mem limit config but up to 100GB(default)
 // NOTICE(cmy): set these default values very large because we don't want to
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 11ee242449..fe0ffeddbb 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -203,9 +203,12 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
         }
     }
 
-    if (_mem_table->memory_usage() >= config::write_buffer_size) {
-        RETURN_NOT_OK(_flush_memtable_async());
-        _reset_mem_table();
+    if (_mem_table->need_to_agg()) {
+        _mem_table->shrink_memtable_by_agg();
+        if (_mem_table->is_flush()) {
+            RETURN_NOT_OK(_flush_memtable_async());
+            _reset_mem_table();
+        }
     }
 
     return Status::OK();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index c7f94ead4f..cf58f7e4ba 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -126,11 +126,10 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num
         }
     }
     size_t cursor_in_mutableblock = _input_mutable_block.rows();
-    size_t oldsize = _input_mutable_block.allocated_bytes();
     _input_mutable_block.add_rows(block, row_pos, num_rows);
-    size_t newsize = _input_mutable_block.allocated_bytes();
-    _mem_usage += newsize - oldsize;
-    _mem_tracker->consume(newsize - oldsize);
+    size_t input_size = block->allocated_bytes() * num_rows / block->rows();
+    _mem_usage += input_size;
+    _mem_tracker->consume(input_size);
 
     for (int i = 0; i < num_rows; i++) {
         _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
@@ -242,7 +241,8 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_
                                  new_row->_row_pos, nullptr);
     }
 }
-vectorized::Block MemTable::_collect_vskiplist_results() {
+template <bool is_final>
+void MemTable::_collect_vskiplist_results() {
     VecTable::Iterator it(_vec_skip_list.get());
     vectorized::Block in_block = _input_mutable_block.to_block();
     // TODO: should try to insert data by column, not by row. to opt the code
@@ -251,6 +251,7 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
             _output_mutable_block.add_row(&in_block, it.key()->_row_pos);
         }
     } else {
+        size_t idx = 0;
         for (it.SeekToFirst(); it.Valid(); it.Next()) {
             auto& block_data = in_block.get_columns_with_type_and_name();
             // move key columns
@@ -263,11 +264,46 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
                 auto function = _agg_functions[i];
                 function->insert_result_into(it.key()->_agg_places[i],
                                              *(_output_mutable_block.get_column_by_position(i)));
-                function->destroy(it.key()->_agg_places[i]);
+                if constexpr (is_final) {
+                    function->destroy(it.key()->_agg_places[i]);
+                }
             }
+            if constexpr (!is_final) {
+                // re-index the row_pos in VSkipList
+                it.key()->_row_pos = idx;
+                idx++;
+            }
+        }
+        if constexpr (!is_final) {
+            // if is not final, we collect the agg results to input_block and then continue to insert
+            size_t shrunked_after_agg = _output_mutable_block.allocated_bytes();
+            _mem_tracker->consume(shrunked_after_agg - _mem_usage);
+            _mem_usage = shrunked_after_agg;
+            _input_mutable_block.swap(_output_mutable_block);
+            //TODO(weixang):opt here.
+            std::unique_ptr<vectorized::Block> empty_input_block =
+                    std::move(in_block.create_same_struct_block(0));
+            _output_mutable_block =
+                    vectorized::MutableBlock::build_mutable_block(empty_input_block.get());
+            _output_mutable_block.clear_column_data();
         }
     }
-    return _output_mutable_block.to_block();
+}
+
+void MemTable::shrink_memtable_by_agg() {
+    if (_keys_type == KeysType::DUP_KEYS) {
+        return;
+    }
+    _collect_vskiplist_results<false>();
+}
+
+bool MemTable::is_flush() {
+    return memory_usage() >= config::write_buffer_size;
+}
+
+bool MemTable::need_to_agg() {
+    return _keys_type == KeysType::DUP_KEYS ? is_flush()
+                                            : memory_usage() >= config::memtable_max_buffer_size;
 }
 
 Status MemTable::flush() {
@@ -301,10 +337,8 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
             RETURN_NOT_OK(st);
         }
     } else {
-        vectorized::Block block = _collect_vskiplist_results();
-        // beta rowset flush parallel, segment write add block is not
-        // thread safe, so use tmp variable segment_write instead of
-        // member variable
+        _collect_vskiplist_results<true>();
+        vectorized::Block block = _output_mutable_block.to_block();
         RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
         _flush_size = block.allocated_bytes();
     }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 910cd92270..c73b39b39d 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -54,6 +54,12 @@ public:
     // insert tuple from (row_pos) to (row_pos+num_rows)
     void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
 
+    void shrink_memtable_by_agg();
+
+    bool is_flush();
+
+    bool need_to_agg();
+
     /// Flush
     Status flush();
     Status close();
@@ -195,7 +201,9 @@ private:
     //for vectorized
     vectorized::MutableBlock _input_mutable_block;
     vectorized::MutableBlock _output_mutable_block;
-    vectorized::Block _collect_vskiplist_results();
+
+    template <bool is_final>
+    void _collect_vskiplist_results();
     bool _is_first_insertion;
 
     void _init_agg_functions(const vectorized::Block* block);
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 4866a1129e..821231684f 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -881,6 +881,17 @@ size_t MutableBlock::rows() const {
     return 0;
 }
 
+void MutableBlock::swap(MutableBlock& another) noexcept {
+    _columns.swap(another._columns);
+    _data_types.swap(another._data_types);
+}
+
+void MutableBlock::swap(MutableBlock&& another) noexcept {
+    clear();
+    _columns = std::move(another._columns);
+    _data_types = std::move(another._data_types);
+}
+
 void MutableBlock::add_row(const Block* block, int row) {
     auto& block_data = block->get_columns_with_type_and_name();
     for (size_t i = 0; i < _columns.size(); ++i) {
@@ -1008,4 +1019,12 @@ size_t MutableBlock::allocated_bytes() const {
     return res;
 }
 
+void MutableBlock::clear_column_data() noexcept {
+    for (auto& col : _columns) {
+        if (col) {
+            col->clear();
+        }
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 729f531291..532e9257aa 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -428,6 +428,10 @@ public:
 
     Block to_block(int start_column, int end_column);
 
+    void swap(MutableBlock& other) noexcept;
+
+    void swap(MutableBlock&& other) noexcept;
+
     void add_row(const Block* block, int row);
     void add_rows(const Block* block, const int* row_begin, const int* row_end);
     void add_rows(const Block* block, size_t row_begin, size_t length);
@@ -438,6 +442,9 @@ public:
         _columns.clear();
         _data_types.clear();
     }
+
+    void clear_column_data() noexcept;
+
     size_t allocated_bytes() const;
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org