You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/04/24 11:15:58 UTC

[GitHub] [incubator-doris] morningman commented on a diff in pull request #9157: [Refactor] Refactor the code of vec stream load

morningman commented on code in PR #9157:
URL: https://github.com/apache/incubator-doris/pull/9157#discussion_r857088018


##########
be/src/olap/memtable.h:
##########
@@ -96,25 +95,19 @@ class MemTable {
             }
         }
 
-        RowCursorCell cell(vectorized::MutableBlock* block, int cid){
-            StringRef ref = block->mutable_columns()[cid]->get_data_at(_row_pos);
-            bool is_null = block->mutable_columns()[cid]->is_null_at(_row_pos);
-            NullState null_state = is_null ? NullState::IS_NULL : NullState::NOT_NULL;
-            return RowCursorCell(ref.data, null_state);
-        }
-
         ~RowInBlock() {
             for (auto agg_place : _agg_places) {
                 delete [] agg_place;
             }
         }
     };
+
     class RowInBlockComparator {
     public:
-        RowInBlockComparator(const Schema* schema):_schema(schema){};
+        RowInBlockComparator(const Schema* schema) : _schema(schema) {};
         //call set_block before operator().
         //在第一次insert block时创建的 _input_mutable_block, 所以无法在Comparator的构造函数中获得pblock

Review Comment:
   Use english comment



##########
be/src/olap/memtable.cpp:
##########
@@ -145,38 +126,33 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num
     _mem_tracker->consume(newsize - oldsize);
 
     for(int i = 0; i < num_rows; i++){       
-        RowInBlock* row_in_block_ptr = new RowInBlock(cursor_in_mutableblock + i);
-        rowInBlocks.push_back(row_in_block_ptr);
-        insert_one_row_from_block(row_in_block_ptr);
+        _row_in_blocks.emplace_back(new RowInBlock{cursor_in_mutableblock + i});

Review Comment:
   For duplicate key table, we don't need `_agg_places` in `RowInBlock`.
   Is there any possible optimization?



##########
be/src/runtime/thread_mem_tracker_mgr.h:
##########
@@ -103,7 +103,7 @@ class ThreadMemTrackerMgr {
     void update_tracker_id(int64_t tracker_id);
 
     void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
-        DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end());
+        // DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end());

Review Comment:
   Why comment out this line?



##########
be/src/olap/memtable.cpp:
##########
@@ -239,51 +215,40 @@ void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_
     }
 }
 
-void MemTable::_aggregate_two_rowInBlock(RowInBlock* new_row, RowInBlock* row_in_skiplist){
-    if (_tablet_schema->has_sequence_col())
-    {
+void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist){
+    if (_tablet_schema->has_sequence_col()) {
         auto sequence_idx = _tablet_schema->sequence_col_idx();
         auto res = _input_mutable_block.compare_at(row_in_skiplist->_row_pos, new_row->_row_pos, sequence_idx, _input_mutable_block, -1);
         // dst sequence column larger than src, don't need to update
         if (res > 0){
-            return ;
+            return;
         }
     }
-    //dst is non-sequence row, or dst sequence is smaller
-    for (uint32_t cid = _schema->num_key_columns(); 
-                    cid < _schema->num_columns();
-                    ++cid) 
-    {
+    // dst is non-sequence row, or dst sequence is smaller
+    for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
         auto place = row_in_skiplist->_agg_places[cid];
-
         auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
-
-        _agg_functions[cid]->add(place, 
-                const_cast<const doris::vectorized::IColumn**>( &col_ptr),
-                new_row->_row_pos,
-                nullptr
-                );
+        _agg_functions[cid]->add(place, const_cast<const doris::vectorized::IColumn**>( &col_ptr),

Review Comment:
   ```suggestion
           _agg_functions[cid]->add(place, const_cast<const doris::vectorized::IColumn**>(&col_ptr),
   ```



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -110,61 +106,27 @@ OLAPStatus BetaRowsetWriter::add_block(const vectorized::Block* block) {
     size_t block_row_num = block->rows();
     size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
     size_t row_offset = 0;
-    int64_t segment_capacity_in_bytes = 0;
-    int64_t segment_capacity_in_rows = 0;
-    auto refresh_segment_capacity = [&]() {
-        segment_capacity_in_bytes =
-                (int64_t)MAX_SEGMENT_SIZE - (int64_t)_segment_writer->estimate_segment_size();
-        segment_capacity_in_rows = (int64_t)_context.max_rows_per_segment -
-                                   (int64_t)_segment_writer->num_rows_written();
-    };
-
-    refresh_segment_capacity();
-    if (UNLIKELY(segment_capacity_in_bytes < row_avg_size_in_bytes ||
-                      segment_capacity_in_rows <= 0)) {
-        // no space for another signle row, need flush now
-        RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
-        RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
-        refresh_segment_capacity();
-    }
 
-    assert(segment_capacity_in_bytes > row_avg_size_in_bytes && segment_capacity_in_rows > 0);
-    if (block_size_in_bytes > segment_capacity_in_bytes ||
-        block_row_num > segment_capacity_in_rows) {
-        size_t segment_max_row_num;
-        size_t input_row_num;
-        do {
-            assert(row_offset < block_row_num);
-            segment_max_row_num =
-                    std::min((size_t)segment_capacity_in_bytes / row_avg_size_in_bytes,
-                             (size_t)segment_capacity_in_rows);
-            input_row_num = std::min(segment_max_row_num, block_row_num - row_offset);
-            assert(input_row_num > 0);
-            auto s = _segment_writer->append_block(block, row_offset, input_row_num);
-            if (UNLIKELY(!s.ok())) {
-                LOG(WARNING) << "failed to append block: " << s.to_string();
-                return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
-            }
+    do {
+        auto max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes);
+        if (UNLIKELY(max_row_add < 1)) {
+            // no space for another signle row, need flush now
+            RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
+            RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
+            DCHECK(_segment_writer->max_row_to_add(row_avg_size_in_bytes) > 0);
+            continue;

Review Comment:
   Can this `continue` be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org