You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/03 09:31:48 UTC

[incubator-doris] branch master updated: [fix](compaction) fix bug for vectorized compaction (#9344)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 49a0cd1925 [fix](compaction) fix bug for vectorized compaction (#9344)
49a0cd1925 is described below

commit 49a0cd19253cb46cde38b255963875ca51b181e2
Author: Gabriel <ga...@gmail.com>
AuthorDate: Tue May 3 17:31:40 2022 +0800

    [fix](compaction) fix bug for vectorized compaction (#9344)
    
    1. add a BE config to switch vectorized compaction
    2. Fix vectorized compaction bug that row statistic is not right.
---
 be/src/common/config.h                    |  2 ++
 be/src/olap/compaction.cpp                | 17 ++++++++++++-----
 be/src/olap/merger.cpp                    |  7 ++-----
 be/src/olap/rowset/beta_rowset_writer.cpp |  3 +++
 be/src/vec/olap/block_reader.cpp          |  6 ++++--
 5 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4d59fd94d4..ac8ddd8ff2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -240,6 +240,8 @@ CONF_Bool(enable_low_cardinality_optimize, "false");
 // be policy
 // whether disable automatic compaction task
 CONF_mBool(disable_auto_compaction, "false");
+// whether enable vectorized compaction
+CONF_Bool(enable_vectorized_compaction, "false");
 // check the configuration of auto compaction in seconds when auto compaction disabled
 CONF_mInt32(check_auto_compaction_interval_seconds, "5");
 
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 156977a9ba..02b41d0aef 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -86,10 +86,17 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     // 2. write merged rows to output rowset
     // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
     Merger::Statistics stats;
-    auto res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
-                                      _output_rs_writer.get(), &stats);
+    Status res;
+    if (config::enable_vectorized_compaction) {
+        res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
+                                     _output_rs_writer.get(), &stats);
+    } else {
+        res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers,
+                                    _output_rs_writer.get(), &stats);
+    }
+    string merge_type = config::enable_vectorized_compaction ? "v" : "";
     if (!res.ok()) {
-        LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
+        LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res
                      << ", tablet=" << _tablet->full_name()
                      << ", output_version=" << _output_version;
         return res;
@@ -132,8 +139,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
         current_max_version = _tablet->rowset_with_max_version()->end_version();
     }
 
-    LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name()
-              << ", output_version=" << _output_version
+    LOG(INFO) << "succeed to do " << merge_type << compaction_name()
+              << ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version
               << ", current_max_version=" << current_max_version
               << ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
               << ". elapsed time=" << watch.get_elapse_second()
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 42759f6b6f..f55b08f437 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -107,15 +107,12 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
 
     vectorized::Block block = schema.create_block(reader_params.return_columns);
     size_t output_rows = 0;
-    while (true) {
-        bool eof = false;
+    bool eof = false;
+    while (!eof) {
         // Read one block from block reader
         RETURN_NOT_OK_LOG(
                 reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof),
                 "failed to read next block when merging rowsets of tablet " + tablet->full_name());
-        if (eof) {
-            break;
-        }
         RETURN_NOT_OK_LOG(
                 dst_rowset_writer->add_block(&block),
                 "failed to write block when merging rowsets of tablet " + tablet->full_name());
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 88eef92b2f..d1dda3e89d 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -323,6 +323,9 @@ Status BetaRowsetWriter::_create_segment_writer(
 }
 
 Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
+    if ((*writer)->num_rows_written() == 0) {
+        return Status::OK();
+    }
     uint64_t segment_size;
     uint64_t index_size;
     Status s = (*writer)->finalize(&segment_size, &index_size);
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 30dd2f3279..78056d8fb8 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -192,6 +192,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
     }
 
     auto target_block_row = 0;
+    auto merged_row = 0;
     auto target_columns = block->mutate_columns();
 
     _insert_data_normal(target_columns);
@@ -218,6 +219,8 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
 
             _insert_data_normal(target_columns);
             target_block_row++;
+        } else {
+            merged_row++;
         }
 
         _append_agg_data(target_columns);
@@ -227,7 +230,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
     _last_agg_data_counter = 0;
     _update_agg_data(target_columns);
 
-    _merged_rows += target_block_row;
+    _merged_rows += merged_row;
     return Status::OK();
 }
 
@@ -260,7 +263,6 @@ Status BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, Obje
         }
     } while (target_block_row < _batch_size);
 
-    _merged_rows += target_block_row;
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org