You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/18 03:29:22 UTC

[incubator-doris] branch master updated: [fixbug](vec-load) fix core of segment_writer while it is not thread-safe (#9569)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c9ab5e22fe [fixbug](vec-load) fix core of segment_writer while it is not thread-safe (#9569)
c9ab5e22fe is described below

commit c9ab5e22fe2e3ef90fbf224dbb358ee01d08b2f5
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Wed May 18 11:29:15 2022 +0800

    [fixbug](vec-load) fix core of segment_writer while it is not thread-safe (#9569)
    
    introduce in stream-load-vec #9280, it will cause multi-thread
    operate to same segment_write cause BetaRowset enable multi-thread
    of memtable flush, memtable flush call rowset_writer.add_block, it
    use member variable _segment_writer to write, so it will cause
    multi-thread in segment write.
    
    Co-authored-by: yixiutt <yi...@selectdb.com>
---
 be/src/olap/memtable.cpp                  |  6 ++++--
 be/src/olap/rowset/beta_rowset_writer.cpp | 26 +++++++++++++++++++++-----
 be/src/olap/rowset/beta_rowset_writer.h   |  3 +++
 be/src/olap/rowset/rowset_writer.h        |  3 +++
 4 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 3a98d2bfec..769169a8ac 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -302,9 +302,11 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
         }
     } else {
         vectorized::Block block = _collect_vskiplist_results();
-        RETURN_NOT_OK(_rowset_writer->add_block(&block));
+        // beta rowset flush parallel, segment write add block is not
+        // thread safe, so use tmp variable segment_write instead of
+        // member variable
+        RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
         _flush_size = block.allocated_bytes();
-        RETURN_NOT_OK(_rowset_writer->flush());
     }
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index d1dda3e89d..f0891234d2 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -104,23 +104,28 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
     if (UNLIKELY(_segment_writer == nullptr)) {
         RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
     }
+    return _add_block(block, &_segment_writer);
+}
+
+Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
+                                    std::unique_ptr<segment_v2::SegmentWriter>* segment_writer) {
     size_t block_size_in_bytes = block->bytes();
     size_t block_row_num = block->rows();
     size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
     size_t row_offset = 0;
 
     do {
-        auto max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes);
+        auto max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
         if (UNLIKELY(max_row_add < 1)) {
             // no space for another signle row, need flush now
-            RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
-            RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
-            max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes);
+            RETURN_NOT_OK(_flush_segment_writer(segment_writer));
+            RETURN_NOT_OK(_create_segment_writer(segment_writer));
+            max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
             DCHECK(max_row_add > 0);
         }
 
         size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
-        auto s = _segment_writer->append_block(block, row_offset, input_row_num);
+        auto s = (*segment_writer)->append_block(block, row_offset, input_row_num);
         if (UNLIKELY(!s.ok())) {
             LOG(WARNING) << "failed to append block: " << s.to_string();
             return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR);
@@ -250,6 +255,17 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus
     return Status::OK();
 }
 
+Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) {
+    if (block->rows() == 0) {
+        return Status::OK();
+    }
+    std::unique_ptr<segment_v2::SegmentWriter> writer;
+    RETURN_NOT_OK(_create_segment_writer(&writer));
+    RETURN_NOT_OK(_add_block(block, &writer));
+    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    return Status::OK();
+}
+
 RowsetSharedPtr BetaRowsetWriter::build() {
     // TODO(lingbin): move to more better place, or in a CreateBlockBatch?
     for (auto& wblock : _wblocks) {
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 53c3496e05..570a8f7650 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -57,6 +57,7 @@ public:
 
     // Return the file size flushed to disk in "flush_size"
     Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) override;
+    Status flush_single_memtable(const vectorized::Block* block) override;
 
     RowsetSharedPtr build() override;
 
@@ -71,6 +72,8 @@ public:
 private:
     template <typename RowType>
     Status _add_row(const RowType& row);
+    Status _add_block(const vectorized::Block* block,
+                      std::unique_ptr<segment_v2::SegmentWriter>* writer);
 
     Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);
 
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index b349743a44..22239f4eaf 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -64,6 +64,9 @@ public:
     virtual Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
         return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED);
     }
+    virtual Status flush_single_memtable(const vectorized::Block* block) {
+        return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED);
+    }
 
     // finish building and return pointer to the built rowset (guaranteed to be inited).
     // return nullptr when failed


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org