You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/05/11 07:55:01 UTC

[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9459: [stream-load-vec]: memtable flush only if necessary after aggregated

HappenLee commented on code in PR #9459:
URL: https://github.com/apache/incubator-doris/pull/9459#discussion_r869979883


##########
be/src/olap/memtable.cpp:
##########
@@ -98,6 +99,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
 MemTable::~MemTable() {
     std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
     _mem_tracker->release(_mem_usage);
+    print_profile();

Review Comment:
   should not print the profile of each time.



##########
be/src/olap/memtable.cpp:
##########
@@ -115,53 +117,61 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
 }
 
 void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
-    if (_is_first_insertion) {
-        _is_first_insertion = false;
-        auto cloneBlock = block->clone_without_columns();
-        _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
-        _vec_row_comparator->set_block(&_input_mutable_block);
-        _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
-        if (_keys_type != KeysType::DUP_KEYS) {
-            _init_agg_functions(block);
+    {
+        SCOPED_TIMER(_insert_time);
+        if (_is_first_insertion) {
+            _is_first_insertion = false;
+            auto cloneBlock = block->clone_without_columns();
+            _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+            _vec_row_comparator->set_block(&_input_mutable_block);
+            _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+            if (_keys_type != KeysType::DUP_KEYS) {
+                _init_agg_functions(block);
+            }
+        }
+        size_t cursor_in_mutableblock = _input_mutable_block.rows();
+        size_t oldsize = _input_mutable_block.allocated_bytes();
+        _input_mutable_block.add_rows(block, row_pos, num_rows);
+        size_t newsize = _input_mutable_block.allocated_bytes();
+        _mem_usage += newsize - oldsize;
+        _mem_tracker->consume(newsize - oldsize);
+        // when new data inserted, the mem_usage of memtable should be re-shrunk again.
+        _is_shrunk_by_agg = false;
+
+        for (int i = 0; i < num_rows; i++) {
+            _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
+            _insert_one_row_from_block(_row_in_blocks.back());
         }
-    }
-    size_t cursor_in_mutableblock = _input_mutable_block.rows();
-    size_t oldsize = _input_mutable_block.allocated_bytes();
-    _input_mutable_block.add_rows(block, row_pos, num_rows);
-    size_t newsize = _input_mutable_block.allocated_bytes();
-    _mem_usage += newsize - oldsize;
-    _mem_tracker->consume(newsize - oldsize);
-
-    for (int i = 0; i < num_rows; i++) {
-        _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
-        _insert_one_row_from_block(_row_in_blocks.back());
     }
 }
 
 void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
-    _rows++;
-    bool overwritten = false;
-    if (_keys_type == KeysType::DUP_KEYS) {
-        // TODO: dup keys only need sort opertaion. Rethink skiplist is the beat way to sort columns?
-        _vec_skip_list->Insert(row_in_block, &overwritten);
-        DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
-        return;
-    }
-
-    bool is_exist = _vec_skip_list->Find(row_in_block, &_vec_hint);
-    if (is_exist) {
-        _aggregate_two_row_in_block(row_in_block, _vec_hint.curr->key);
-    } else {
-        row_in_block->init_agg_places(_agg_functions, _schema->num_key_columns());
-        for (auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++) {
-            auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
-            auto place = row_in_block->_agg_places[cid];
-            _agg_functions[cid]->add(place,
-                                     const_cast<const doris::vectorized::IColumn**>(&col_ptr),
-                                     row_in_block->_row_pos, nullptr);
+    {
+        SCOPED_TIMER(_sort_agg_time);

Review Comment:
   Timers have a certain overhead and I think we should turn them on when we need to debug. And the timing of each line will greatly affect the execution efficiency, we need more efficient but not necessarily accurate timing



##########
be/src/olap/memtable.cpp:
##########
@@ -263,22 +274,56 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
                 auto function = _agg_functions[i];
                 function->insert_result_into(it.key()->_agg_places[i],
                                              *(_output_mutable_block.get_column_by_position(i)));
-                function->destroy(it.key()->_agg_places[i]);
+                if (final) {
+                    function->destroy(it.key()->_agg_places[i]);
+                }
             }
+            // re-index the row_pos in VSkipList
+            it.key()->_row_pos = idx;
+            idx++;
+        }
+        if (!final) {
+            _input_mutable_block.swap(_output_mutable_block);
+            //TODO(weixang):opt here.
+            _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&in_block);
+            _output_mutable_block.clear_column_data();
+        }
+    }
+}
+
+void MemTable::shrink_memtable_by_agg() {
+    {
+        SCOPED_TIMER(_shrunk_agg_time);
+        if (_is_shrunk_by_agg) {
+            return;
         }
+        size_t old_size = _input_mutable_block.allocated_bytes();
+        _collect_vskiplist_to_output(false);
+        size_t new_size = _input_mutable_block.allocated_bytes();
+        // shrink mem usage of memetable after agged.
+        _mem_usage += new_size - old_size;
+        _mem_tracker->consume(new_size - old_size);
+        _is_shrunk_by_agg = true;
     }
-    return _output_mutable_block.to_block();
+}
+
+bool MemTable::is_full() {
+    return memory_usage() >= config::write_buffer_size;
 }
 
 Status MemTable::flush() {
-    VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
+    clock_t now = clock();

Review Comment:
   do too many log operation



##########
be/src/olap/memtable.cpp:
##########
@@ -263,22 +274,56 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
                 auto function = _agg_functions[i];
                 function->insert_result_into(it.key()->_agg_places[i],
                                              *(_output_mutable_block.get_column_by_position(i)));
-                function->destroy(it.key()->_agg_places[i]);
+                if (final) {

Review Comment:
   use a template and if constexpr to do the thing will more effective



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org