You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/10/27 15:26:39 UTC

[GitHub] [doris] dataroaring commented on a diff in pull request #12866: [enhancement](compaction) introduce segment compaction (#12609)

dataroaring commented on code in PR #12866:
URL: https://github.com/apache/doris/pull/12866#discussion_r1006852204


##########
be/src/olap/olap_server.cpp:
##########
@@ -701,6 +709,19 @@ Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
     return Status::OK();
 }
 
+Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer,
+                                             SegCompactionCandidatesSharedPtr segments) {
+    writer->segcompaction(segments);
+    // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
+    return Status::OK();
+}
+
+Status StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer,
+                                                 SegCompactionCandidatesSharedPtr segments) {
+    return _seg_compaction_thread_pool->submit_func(
+            std::bind<void>(&StorageEngine::_handle_seg_compaction, this, writer, segments));
+}
+

Review Comment:
   We should wait the task finished when storage engine is deconstucted.



##########
be/src/olap/delta_writer.cpp:
##########
@@ -171,12 +171,12 @@ Status DeltaWriter::write(Tuple* tuple) {
     // if memtable is full, push it to the flush executor,
     // and create a new memtable for incoming data
     if (_mem_table->memory_usage() >= config::write_buffer_size) {
-        if (++_segment_counter > config::max_segment_num_per_rowset) {

Review Comment:
   Is this config item still be used? If no we should remove it.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();
+        return nullptr;
+    }
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK() || writer == nullptr) {
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+        return nullptr;
+    } else {
+        return writer;
+    }
+}
+
+Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been cleaned up
+        // will be cleaned up by the GC background. So here we only print the error
+        // message when we encounter an error.
+        RETURN_NOT_OK_LOG(fs->delete_file(seg_path),
+                          strings::Substitute("Failed to delete file=$0", seg_path));
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    _context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
+    if (seg_id == _num_segcompacted) {
+        return Status::OK();
+    }
+
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to "
+              << dst_seg_path;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
+        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
+                  true);
+        Statistics org = _segid_statistics_map[seg_id];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
+    }
+    ++_num_segcompacted;
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path;

Review Comment:
   log out ret and print errno,here.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();
+        return nullptr;
+    }
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK() || writer == nullptr) {
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+        return nullptr;
+    } else {
+        return writer;
+    }
+}
+
+Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been cleaned up
+        // will be cleaned up by the GC background. So here we only print the error
+        // message when we encounter an error.
+        RETURN_NOT_OK_LOG(fs->delete_file(seg_path),
+                          strings::Substitute("Failed to delete file=$0", seg_path));
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    _context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {

Review Comment:
   print a log and errno.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -236,6 +692,26 @@ RowsetSharedPtr BetaRowsetWriter::build() {
             return nullptr;
         }
     }
+    Status status;

Review Comment:
   Not related to this pr, we should close file_writer timely, otherwise we consumes a lot fds.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();
+        return nullptr;
+    }
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK() || writer == nullptr) {
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+        return nullptr;
+    } else {
+        return writer;
+    }
+}
+
+Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been cleaned up
+        // will be cleaned up by the GC background. So here we only print the error
+        // message when we encounter an error.
+        RETURN_NOT_OK_LOG(fs->delete_file(seg_path),
+                          strings::Substitute("Failed to delete file=$0", seg_path));
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    _context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
+    if (seg_id == _num_segcompacted) {
+        return Status::OK();
+    }
+
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to "
+              << dst_seg_path;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
+        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
+                  true);
+        Statistics org = _segid_statistics_map[seg_id];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
+    }
+    ++_num_segcompacted;
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path;
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t begin,
+                                                                      uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i);
+    }
+}
+
+Status BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics> stat,
+                                            uint64_t merged_row_stat, uint64_t row_count,
+                                            uint64_t begin, uint64_t end) {
+    uint64_t stat_read_row = stat->raw_rows_read;
+    uint64_t sum_target_row = 0;
+
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        for (int i = begin; i <= end; ++i) {
+            sum_target_row += _segid_statistics_map[i].row_num;
+        }
+    }
+
+    if (sum_target_row != stat_read_row) {
+        LOG(WARNING) << "read row_num does not match. expect read row:" << sum_target_row
+                     << " actual read row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+
+    uint64_t total_row = row_count + merged_row_stat;
+    if (stat_read_row != total_row) {
+        LOG(WARNING) << "total row_num does not match. expect total row:" << total_row
+                     << " actual total row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_do_segcompaction(SegCompactionCandidatesSharedPtr segments) {
+    SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(),
+                       ThreadContext::TaskType::COMPACTION);
+    // throttle segcompaction task if memory depleted.
+    if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) {
+        LOG(WARNING) << "skip segcompaction due to memory shortage";
+        return Status::MemoryLimitExceeded("skip segcompaction due to memory shortage");
+    }
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           _context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    uint64_t merged_row_stat = 0;
+    vectorized::VMergeIterator* reader =
+            _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat);
+    if (UNLIKELY(reader == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction reader";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_READER);
+    }
+    std::unique_ptr<vectorized::VMergeIterator> reader_ptr;
+    reader_ptr.reset(reader);
+    auto writer = _create_segcompaction_writer(begin, end);
+    if (UNLIKELY(writer == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction writer";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_WRITER);
+    }
+    uint64_t row_count = 0;
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader_ptr->next_batch(&block);
+        row_count += block.rows();
+        if (status != Status::OK()) {
+            if (LIKELY(status.is_end_of_file())) {
+                RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer),
+                                  "write block failed");
+                break;
+            } else {
+                LOG(WARNING) << "read block failed: " << status.to_string();
+                return status;
+            }
+        }
+        RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), "write block failed");
+        block.clear_column_data();
+    }
+    RETURN_NOT_OK_LOG(_check_correctness(std::move(stat), merged_row_stat, row_count, begin, end),
+                      "check correctness failed");
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        _clear_statistics_for_deleting_segments_unsafe(begin, end);
+    }
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    RETURN_NOT_OK(_delete_original_segments(begin, end));
+    RETURN_NOT_OK(_rename_compacted_segments(begin, end));
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    _segcompacted_point += (end - begin + 1);
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed
+              << "us. _segcompacted_point update:" << _segcompacted_point;
+
+    return Status::OK();
+}
+
+void BetaRowsetWriter::segcompaction(SegCompactionCandidatesSharedPtr segments) {
+    Status status = _do_segcompaction(segments);
+    if (!status.ok()) {
+        int16_t errcode = status.precise_code();
+        switch (errcode) {
+        case OLAP_ERR_SEGCOMPACTION_INIT_READER:
+        case OLAP_ERR_SEGCOMPACTION_INIT_WRITER:
+            LOG(WARNING) << "segcompaction failed, try next time:" << status;

Review Comment:
   We should skip memory exceed limit too.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();
+        return nullptr;
+    }
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK() || writer == nullptr) {
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+        return nullptr;
+    } else {
+        return writer;
+    }
+}
+
+Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been cleaned up
+        // will be cleaned up by the GC background. So here we only print the error
+        // message when we encounter an error.
+        RETURN_NOT_OK_LOG(fs->delete_file(seg_path),
+                          strings::Substitute("Failed to delete file=$0", seg_path));
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    _context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
+    if (seg_id == _num_segcompacted) {
+        return Status::OK();
+    }
+
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to "
+              << dst_seg_path;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
+        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
+                  true);
+        Statistics org = _segid_statistics_map[seg_id];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
+    }
+    ++_num_segcompacted;
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path;
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t begin,
+                                                                      uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i);
+    }
+}
+
+Status BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics> stat,
+                                            uint64_t merged_row_stat, uint64_t row_count,
+                                            uint64_t begin, uint64_t end) {
+    uint64_t stat_read_row = stat->raw_rows_read;
+    uint64_t sum_target_row = 0;
+
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        for (int i = begin; i <= end; ++i) {
+            sum_target_row += _segid_statistics_map[i].row_num;
+        }
+    }
+
+    if (sum_target_row != stat_read_row) {
+        LOG(WARNING) << "read row_num does not match. expect read row:" << sum_target_row
+                     << " actual read row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+
+    uint64_t total_row = row_count + merged_row_stat;
+    if (stat_read_row != total_row) {
+        LOG(WARNING) << "total row_num does not match. expect total row:" << total_row
+                     << " actual total row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_do_segcompaction(SegCompactionCandidatesSharedPtr segments) {
+    SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(),
+                       ThreadContext::TaskType::COMPACTION);
+    // throttle segcompaction task if memory depleted.
+    if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) {
+        LOG(WARNING) << "skip segcompaction due to memory shortage";
+        return Status::MemoryLimitExceeded("skip segcompaction due to memory shortage");
+    }
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           _context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    uint64_t merged_row_stat = 0;
+    vectorized::VMergeIterator* reader =
+            _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat);
+    if (UNLIKELY(reader == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction reader";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_READER);
+    }
+    std::unique_ptr<vectorized::VMergeIterator> reader_ptr;
+    reader_ptr.reset(reader);
+    auto writer = _create_segcompaction_writer(begin, end);
+    if (UNLIKELY(writer == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction writer";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_WRITER);
+    }
+    uint64_t row_count = 0;
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader_ptr->next_batch(&block);
+        row_count += block.rows();
+        if (status != Status::OK()) {
+            if (LIKELY(status.is_end_of_file())) {
+                RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer),
+                                  "write block failed");
+                break;
+            } else {
+                LOG(WARNING) << "read block failed: " << status.to_string();
+                return status;
+            }
+        }
+        RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), "write block failed");
+        block.clear_column_data();
+    }
+    RETURN_NOT_OK_LOG(_check_correctness(std::move(stat), merged_row_stat, row_count, begin, end),
+                      "check correctness failed");
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        _clear_statistics_for_deleting_segments_unsafe(begin, end);
+    }
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    RETURN_NOT_OK(_delete_original_segments(begin, end));
+    RETURN_NOT_OK(_rename_compacted_segments(begin, end));
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    _segcompacted_point += (end - begin + 1);
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed
+              << "us. _segcompacted_point update:" << _segcompacted_point;
+
+    return Status::OK();
+}
+
+void BetaRowsetWriter::segcompaction(SegCompactionCandidatesSharedPtr segments) {
+    Status status = _do_segcompaction(segments);
+    if (!status.ok()) {
+        int16_t errcode = status.precise_code();
+        switch (errcode) {
+        case OLAP_ERR_SEGCOMPACTION_INIT_READER:
+        case OLAP_ERR_SEGCOMPACTION_INIT_WRITER:
+            LOG(WARNING) << "segcompaction failed, try next time:" << status;
+            return;
+        default:
+            LOG(WARNING) << "segcompaction fatal, terminating the write job:" << status;
+            // status will be checked by the next trigger of segcompaction or the final wait
+            _segcompaction_status.store(OLAP_ERR_OTHER_ERROR);
+        }
+    }
+    DCHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+}
+
+Status BetaRowsetWriter::_load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+/* policy of segcompaction target selection:
+ *  1. skip big segments
+ *  2. if the consecutive smalls end up with a big, compact the smalls, except
+ *     single small
+ *  3. if the consecutive smalls end up with small, compact the smalls if the
+ *     length is beyond (config::segcompaction_threshold_segment_num / 2)
+ */
+Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    RETURN_NOT_OK(_load_noncompacted_segments(&all_segments));
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+        }
+    }
+    size_t s = segments->size();
+    if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) {
+        // start with big segments and end with small, better to do it in next
+        // round to compact more at once
+        segments->clear();
+        return Status::OK();
+    }
+    if (s == 1) { // poor bachelor, let it go
+        LOG(INFO) << "only one candidate segment";
+        RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
+        segments->clear();
+        return Status::OK();
+    }
+    std::stringstream ss;
+    for (auto& segment : (*segments.get())) {
+        ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]";
+    }
+    LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str();
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments,
+                                                       bool is_last) {
+    if (is_last) {
+        LOG(INFO) << "segcompaction last few segments";

Review Comment:
   remove useless log or change it to debug.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();

Review Comment:
   We should release iterators here.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -332,7 +851,31 @@ Status BetaRowsetWriter::_create_segment_writer(
     return Status::OK();
 }
 
-Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
+Status BetaRowsetWriter::_create_segment_writer(
+        std::unique_ptr<segment_v2::SegmentWriter>* writer) {
+    size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
+    if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
+        LOG(ERROR) << "too many segments in rowset."
+                   << " max:" << config::max_segment_num_per_rowset
+                   << " _num_segment:" << _num_segment
+                   << " _segcompacted_point:" << _segcompacted_point
+                   << " _num_segcompacted:" << _num_segcompacted;
+        return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);

Review Comment:
   I do not think we should return TOO MANY SEGMENTS here, we can coninue write, and do segment compaction when write finished. and we shoud do this job in a new pull request.
   
   e.g. If I write a lot data 100T and there would be many segments. we can ingore this error and continue, and do segcompaction in backgroud, when write is finished betarowset writers would be closed, we can compact segment then until it number of segments meet the requirements.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();
+        return nullptr;
+    }
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK() || writer == nullptr) {
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
+                   << " path:" << writer->get_data_dir()->path();

Review Comment:
   log status here.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -315,12 +823,23 @@ Status BetaRowsetWriter::_create_segment_writer(
     DCHECK(file_writer != nullptr);
     segment_v2::SegmentWriterOptions writer_options;
     writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
-    writer->reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id,
-                                                _context.tablet_schema, _context.data_dir,
-                                                _context.max_rows_per_segment, writer_options));
-    {
-        std::lock_guard<SpinLock> l(_lock);
-        _file_writers.push_back(std::move(file_writer));
+
+    if (is_segcompaction) {
+        writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segcompacted,
+                                                    _context.tablet_schema, _context.data_dir,
+                                                    _context.max_rows_per_segment, writer_options));
+        if (_segcompaction_file_writer != nullptr) {

Review Comment:
   If we have a segcompaction_file_writer here, does that mean last segment compaction is failed?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -236,6 +692,26 @@ RowsetSharedPtr BetaRowsetWriter::build() {
             return nullptr;
         }
     }
+    Status status;
+    status = _wait_flying_segcompaction();
+    if (!status.ok()) {
+        LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status;
+        return nullptr;
+    }
+    status = _segcompaction_ramaining_if_necessary();
+    if (!status.ok()) {
+        LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status;

Review Comment:
   We should not return error due to segcompaction failure?



##########
be/src/olap/storage_engine.h:
##########
@@ -49,6 +49,7 @@
 #include "util/countdown_latch.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
+//#include "olap/rowset/beta_rowset_writer.h"
 

Review Comment:
   remove useless code line。



##########
be/src/olap/olap_server.cpp:
##########
@@ -701,6 +709,19 @@ Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
     return Status::OK();
 }
 
+Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer,
+                                             SegCompactionCandidatesSharedPtr segments) {
+    writer->segcompaction(segments);

Review Comment:
   compact_segments is a better name?



##########
be/src/olap/olap_server.cpp:
##########
@@ -701,6 +709,19 @@ Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
     return Status::OK();
 }
 
+Status StorageEngine::_handle_seg_compaction(BetaRowsetWriter* writer,
+                                             SegCompactionCandidatesSharedPtr segments) {
+    writer->segcompaction(segments);
+    // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
+    return Status::OK();
+}
+
+Status StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer,
+                                                 SegCompactionCandidatesSharedPtr segments) {
+    return _seg_compaction_thread_pool->submit_func(
+            std::bind<void>(&StorageEngine::_handle_seg_compaction, this, writer, segments));
+}
+

Review Comment:
   or cancel it.



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();
+        return nullptr;
+    }
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK() || writer == nullptr) {
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+        return nullptr;
+    } else {
+        return writer;
+    }
+}
+
+Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been cleaned up
+        // will be cleaned up by the GC background. So here we only print the error
+        // message when we encounter an error.
+        RETURN_NOT_OK_LOG(fs->delete_file(seg_path),
+                          strings::Substitute("Failed to delete file=$0", seg_path));
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    _context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
+    if (seg_id == _num_segcompacted) {
+        return Status::OK();
+    }
+
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to "
+              << dst_seg_path;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
+        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
+                  true);
+        Statistics org = _segid_statistics_map[seg_id];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
+    }
+    ++_num_segcompacted;
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path;
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t begin,
+                                                                      uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i);
+    }
+}
+
+Status BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics> stat,
+                                            uint64_t merged_row_stat, uint64_t row_count,
+                                            uint64_t begin, uint64_t end) {
+    uint64_t stat_read_row = stat->raw_rows_read;
+    uint64_t sum_target_row = 0;
+
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        for (int i = begin; i <= end; ++i) {
+            sum_target_row += _segid_statistics_map[i].row_num;
+        }
+    }
+
+    if (sum_target_row != stat_read_row) {
+        LOG(WARNING) << "read row_num does not match. expect read row:" << sum_target_row
+                     << " actual read row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+
+    uint64_t total_row = row_count + merged_row_stat;
+    if (stat_read_row != total_row) {
+        LOG(WARNING) << "total row_num does not match. expect total row:" << total_row
+                     << " actual total row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_do_segcompaction(SegCompactionCandidatesSharedPtr segments) {
+    SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(),
+                       ThreadContext::TaskType::COMPACTION);
+    // throttle segcompaction task if memory depleted.
+    if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) {
+        LOG(WARNING) << "skip segcompaction due to memory shortage";
+        return Status::MemoryLimitExceeded("skip segcompaction due to memory shortage");
+    }
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           _context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    uint64_t merged_row_stat = 0;
+    vectorized::VMergeIterator* reader =
+            _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat);
+    if (UNLIKELY(reader == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction reader";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_READER);
+    }
+    std::unique_ptr<vectorized::VMergeIterator> reader_ptr;
+    reader_ptr.reset(reader);
+    auto writer = _create_segcompaction_writer(begin, end);
+    if (UNLIKELY(writer == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction writer";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_WRITER);
+    }
+    uint64_t row_count = 0;
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader_ptr->next_batch(&block);
+        row_count += block.rows();
+        if (status != Status::OK()) {
+            if (LIKELY(status.is_end_of_file())) {
+                RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer),
+                                  "write block failed");
+                break;
+            } else {
+                LOG(WARNING) << "read block failed: " << status.to_string();
+                return status;
+            }
+        }
+        RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), "write block failed");
+        block.clear_column_data();
+    }
+    RETURN_NOT_OK_LOG(_check_correctness(std::move(stat), merged_row_stat, row_count, begin, end),
+                      "check correctness failed");
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        _clear_statistics_for_deleting_segments_unsafe(begin, end);
+    }
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    RETURN_NOT_OK(_delete_original_segments(begin, end));
+    RETURN_NOT_OK(_rename_compacted_segments(begin, end));
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    _segcompacted_point += (end - begin + 1);
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed
+              << "us. _segcompacted_point update:" << _segcompacted_point;
+
+    return Status::OK();
+}
+
+void BetaRowsetWriter::segcompaction(SegCompactionCandidatesSharedPtr segments) {
+    Status status = _do_segcompaction(segments);
+    if (!status.ok()) {
+        int16_t errcode = status.precise_code();
+        switch (errcode) {
+        case OLAP_ERR_SEGCOMPACTION_INIT_READER:
+        case OLAP_ERR_SEGCOMPACTION_INIT_WRITER:
+            LOG(WARNING) << "segcompaction failed, try next time:" << status;
+            return;
+        default:
+            LOG(WARNING) << "segcompaction fatal, terminating the write job:" << status;
+            // status will be checked by the next trigger of segcompaction or the final wait
+            _segcompaction_status.store(OLAP_ERR_OTHER_ERROR);
+        }
+    }
+    DCHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+}
+
+Status BetaRowsetWriter::_load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+/* policy of segcompaction target selection:
+ *  1. skip big segments
+ *  2. if the consecutive smalls end up with a big, compact the smalls, except
+ *     single small
+ *  3. if the consecutive smalls end up with small, compact the smalls if the
+ *     length is beyond (config::segcompaction_threshold_segment_num / 2)
+ */
+Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    RETURN_NOT_OK(_load_noncompacted_segments(&all_segments));
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+        }
+    }
+    size_t s = segments->size();
+    if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) {
+        // start with big segments and end with small, better to do it in next
+        // round to compact more at once
+        segments->clear();
+        return Status::OK();
+    }
+    if (s == 1) { // poor bachelor, let it go
+        LOG(INFO) << "only one candidate segment";

Review Comment:
   INFO -> DEBUG or VLOG



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -102,6 +115,409 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
+vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
+        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+    StorageReadOptions read_options;
+    read_options.stats = stat;
+    read_options.use_page_cache = false;
+    read_options.tablet_schema = _context.tablet_schema;
+    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    for (auto& seg_ptr : *segments) {
+        std::unique_ptr<RowwiseIterator> iter;
+        auto s = seg_ptr->new_iterator(*schema, read_options, &iter);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
+            return nullptr;
+        }
+        seg_iterators.push_back(std::move(iter));
+    }
+    std::vector<RowwiseIterator*> iterators;
+    for (auto& owned_it : seg_iterators) {
+        // transfer ownership
+        iterators.push_back(owned_it.release());
+    }
+    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
+    bool is_reverse = false;
+    auto merge_itr =
+            vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
+    DCHECK(merge_itr);
+    auto s = merge_itr->init(read_options);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to init iterator: " << s.to_string();
+        return nullptr;
+    }
+
+    return (vectorized::VMergeIterator*)merge_itr;
+}
+
+std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
+        uint64_t begin, uint64_t end) {
+    Status status;
+    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
+    if (status != Status::OK() || writer == nullptr) {
+        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
+                   << " path:" << writer->get_data_dir()->path();
+        return nullptr;
+    } else {
+        return writer;
+    }
+}
+
+Status BetaRowsetWriter::_delete_original_segments(uint32_t begin, uint32_t end) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (uint32_t i = begin; i <= end; ++i) {
+        auto seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i);
+        // Even if an error is encountered, these files that have not been cleaned up
+        // will be cleaned up by the GC background. So here we only print the error
+        // message when we encounter an error.
+        RETURN_NOT_OK_LOG(fs->delete_file(seg_path),
+                          strings::Substitute("Failed to delete file=$0", seg_path));
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
+    int ret;
+    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
+                                                                    _context.rowset_id, begin, end);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted++);
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
+    if (seg_id == _num_segcompacted) {
+        return Status::OK();
+    }
+
+    int ret;
+    auto src_seg_path =
+            BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id);
+    auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id,
+                                                       _num_segcompacted);
+    LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to "
+              << dst_seg_path;
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
+        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
+                  true);
+        Statistics org = _segid_statistics_map[seg_id];
+        _segid_statistics_map.emplace(_num_segcompacted, org);
+        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
+    }
+    ++_num_segcompacted;
+    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+    if (ret) {
+        LOG(WARNING) << "failed to rename " << src_seg_path << " to " << dst_seg_path;
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED);
+    }
+    return Status::OK();
+}
+
+void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint64_t begin,
+                                                                      uint64_t end) {
+    LOG(INFO) << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
+    for (int i = begin; i <= end; ++i) {
+        _segid_statistics_map.erase(i);
+    }
+}
+
+Status BetaRowsetWriter::_check_correctness(std::unique_ptr<OlapReaderStatistics> stat,
+                                            uint64_t merged_row_stat, uint64_t row_count,
+                                            uint64_t begin, uint64_t end) {
+    uint64_t stat_read_row = stat->raw_rows_read;
+    uint64_t sum_target_row = 0;
+
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        for (int i = begin; i <= end; ++i) {
+            sum_target_row += _segid_statistics_map[i].row_num;
+        }
+    }
+
+    if (sum_target_row != stat_read_row) {
+        LOG(WARNING) << "read row_num does not match. expect read row:" << sum_target_row
+                     << " actual read row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+
+    uint64_t total_row = row_count + merged_row_stat;
+    if (stat_read_row != total_row) {
+        LOG(WARNING) << "total row_num does not match. expect total row:" << total_row
+                     << " actual total row:" << stat_read_row;
+        return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+    }
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_do_segcompaction(SegCompactionCandidatesSharedPtr segments) {
+    SCOPED_ATTACH_TASK(StorageEngine::instance()->segcompaction_mem_tracker(),
+                       ThreadContext::TaskType::COMPACTION);
+    // throttle segcompaction task if memory depleted.
+    if (MemTrackerLimiter::sys_mem_exceed_limit_check(GB_EXCHANGE_BYTE)) {
+        LOG(WARNING) << "skip segcompaction due to memory shortage";
+        return Status::MemoryLimitExceeded("skip segcompaction due to memory shortage");
+    }
+    uint64_t begin = (*(segments->begin()))->id();
+    uint64_t end = (*(segments->end() - 1))->id();
+    LOG(INFO) << "BetaRowsetWriter:" << this << " do segcompaction at " << segments->size()
+              << " segments. Begin:" << begin << " End:" << end;
+    uint64_t begin_time = GetCurrentTimeMicros();
+
+    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
+                                           _context.tablet_schema->columns().size());
+    std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
+    uint64_t merged_row_stat = 0;
+    vectorized::VMergeIterator* reader =
+            _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat);
+    if (UNLIKELY(reader == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction reader";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_READER);
+    }
+    std::unique_ptr<vectorized::VMergeIterator> reader_ptr;
+    reader_ptr.reset(reader);
+    auto writer = _create_segcompaction_writer(begin, end);
+    if (UNLIKELY(writer == nullptr)) {
+        LOG(WARNING) << "failed to get segcompaction writer";
+        return Status::OLAPInternalError(OLAP_ERR_SEGCOMPACTION_INIT_WRITER);
+    }
+    uint64_t row_count = 0;
+    vectorized::Block block = _context.tablet_schema->create_block();
+    while (true) {
+        auto status = reader_ptr->next_batch(&block);
+        row_count += block.rows();
+        if (status != Status::OK()) {
+            if (LIKELY(status.is_end_of_file())) {
+                RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer),
+                                  "write block failed");
+                break;
+            } else {
+                LOG(WARNING) << "read block failed: " << status.to_string();
+                return status;
+            }
+        }
+        RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, &writer), "write block failed");
+        block.clear_column_data();
+    }
+    RETURN_NOT_OK_LOG(_check_correctness(std::move(stat), merged_row_stat, row_count, begin, end),
+                      "check correctness failed");
+    {
+        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
+        _clear_statistics_for_deleting_segments_unsafe(begin, end);
+    }
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    RETURN_NOT_OK(_delete_original_segments(begin, end));
+    RETURN_NOT_OK(_rename_compacted_segments(begin, end));
+
+    if (VLOG_DEBUG_IS_ON) {
+        std::stringstream ss;
+        for (const auto& entry : std::filesystem::directory_iterator(_context.tablet_path)) {
+            ss << "[" << entry.path() << "]";
+        }
+        VLOG_DEBUG << "_segcompacted_point:" << _segcompacted_point
+                   << " _num_segment:" << _num_segment << " _num_segcompacted:" << _num_segcompacted
+                   << " list directory:" << ss.str();
+    }
+
+    _segcompacted_point += (end - begin + 1);
+
+    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
+    LOG(INFO) << "BetaRowsetWriter:" << this << " segcompaction completed. elapsed time:" << elapsed
+              << "us. _segcompacted_point update:" << _segcompacted_point;
+
+    return Status::OK();
+}
+
+void BetaRowsetWriter::segcompaction(SegCompactionCandidatesSharedPtr segments) {
+    Status status = _do_segcompaction(segments);
+    if (!status.ok()) {
+        int16_t errcode = status.precise_code();
+        switch (errcode) {
+        case OLAP_ERR_SEGCOMPACTION_INIT_READER:
+        case OLAP_ERR_SEGCOMPACTION_INIT_WRITER:
+            LOG(WARNING) << "segcompaction failed, try next time:" << status;
+            return;
+        default:
+            LOG(WARNING) << "segcompaction fatal, terminating the write job:" << status;
+            // status will be checked by the next trigger of segcompaction or the final wait
+            _segcompaction_status.store(OLAP_ERR_OTHER_ERROR);
+        }
+    }
+    DCHECK_EQ(_is_doing_segcompaction, true);
+    _is_doing_segcompaction = false;
+    _segcompacting_cond.notify_all();
+}
+
+Status BetaRowsetWriter::_load_noncompacted_segments(
+        std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    auto fs = _rowset_meta->fs();
+    if (!fs) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = _segcompacted_point; seg_id < _num_segment; ++seg_id) {
+        auto seg_path =
+                BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id);
+        auto cache_path =
+                BetaRowset::local_cache_path(_context.tablet_path, _context.rowset_id, seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _context.tablet_schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+    }
+    return Status::OK();
+}
+
+/* policy of segcompaction target selection:
+ *  1. skip big segments
+ *  2. if the consecutive smalls end up with a big, compact the smalls, except
+ *     single small
+ *  3. if the consecutive smalls end up with small, compact the smalls if the
+ *     length is beyond (config::segcompaction_threshold_segment_num / 2)
+ */
+Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
+        SegCompactionCandidatesSharedPtr segments) {
+    std::vector<segment_v2::SegmentSharedPtr> all_segments;
+    RETURN_NOT_OK(_load_noncompacted_segments(&all_segments));
+
+    std::stringstream ss_all;
+    for (auto& segment : all_segments) {
+        ss_all << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]";
+    }
+    LOG(INFO) << "all noncompacted segments num:" << all_segments.size()
+              << " list of segments:" << ss_all.str();
+
+    bool is_terminated_by_big = false;
+    bool let_big_terminate = false;
+    size_t small_threshold = config::segcompaction_small_threshold;
+    for (int64_t i = 0; i < all_segments.size(); ++i) {
+        segment_v2::SegmentSharedPtr seg = all_segments[i];
+        if (seg->num_rows() > small_threshold) {
+            if (let_big_terminate) {
+                is_terminated_by_big = true;
+                break;
+            } else {
+                RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
+            }
+        } else {
+            let_big_terminate = true; // break if find a big after small
+            segments->push_back(seg);
+        }
+    }
+    size_t s = segments->size();
+    if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) {
+        // start with big segments and end with small, better to do it in next
+        // round to compact more at once
+        segments->clear();
+        return Status::OK();
+    }
+    if (s == 1) { // poor bachelor, let it go
+        LOG(INFO) << "only one candidate segment";
+        RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++));
+        segments->clear();
+        return Status::OK();
+    }
+    std::stringstream ss;
+    for (auto& segment : (*segments.get())) {
+        ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]";
+    }
+    LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str();

Review Comment:
   maybe DEBUG?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org