You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2023/01/04 13:01:54 UTC

[GitHub] [doris] jacktengg opened a new pull request, #15624: [feature] support spill to disk for sort node

jacktengg opened a new pull request, #15624:
URL: https://github.com/apache/doris/pull/15624

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem summary
   
   Describe your changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: 
       - [ ] Yes
       - [ ] No
       - [ ] I don't know
   2. Has unit tests been added:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   3. Has document been added or modified:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   4. Does it need to update dependencies:
       - [ ] Yes
       - [ ] No
   5. Are there any changes that cannot be rolled back:
       - [ ] Yes (If Yes, please explain WHY)
       - [ ] No
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062145039


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());

Review Comment:
   it is too small because the BUFFERED_BLOCK_BYTES is 16MB, there maybe a lot of small files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062200906


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
+    _build_merge_tree_not_spilled(sort_description);
+
+    if (spilled_sorted_block_streams_.size() > 0) {
+        if (sorted_blocks_.size() > 0) {
+            BlockSpillWriterUPtr spill_block_wirter;
+            RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                    spill_block_batch_size_, spill_block_wirter));
+
+            LOG(WARNING) << "spill to disk: build merge tree, in memory sorted blocks: "
+                         << sorted_blocks_.size();
+            if (sorted_blocks_.size() == 1) {
+                RETURN_IF_ERROR(spill_block_wirter->write(sorted_blocks_[0]));
+            } else {
+                bool eos = false;
+
+                // merge blocks in memory and write merge result to disk
+                while (!eos) {
+                    merge_sorted_block_.clear_column_data();
+                    _merge_sort_read_not_spilled(spill_block_batch_size_, &merge_sorted_block_,
+                                                 &eos);
+                    RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+                }
+            }
+            spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+            spill_block_wirter->close();
+        }
+        RETURN_IF_ERROR(_merge_spilled_blocks(sort_description));
+    }
+    return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
                                          doris::vectorized::Block* block, bool* eos) {
-    size_t num_columns = sorted_blocks[0].columns();
+    Status status = Status::OK();
+    if (is_spilled_) {
+        status = merger_->get_next(block, eos);
+        LOG(WARNING) << "spill to disk: merger_->get_next, eos: " << *eos;
+    } else {
+        LOG(WARNING) << "spill to disk: _merge_sort_read_not_spilled, eos: " << *eos;
+        if (sorted_blocks_.empty()) {
+            *eos = true;
+        } else if (sorted_blocks_.size() == 1) {
+            if (offset_ != 0) {
+                sorted_blocks_[0].skip_num_rows(offset_);
+            }
+            block->swap(sorted_blocks_[0]);
+            *eos = true;
+        } else {
+            status = _merge_sort_read_not_spilled(state->batch_size(), block, eos);
+        }
+    }
+    return status;

Review Comment:
   return Status::OK();



##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
+    _build_merge_tree_not_spilled(sort_description);
+
+    if (spilled_sorted_block_streams_.size() > 0) {
+        if (sorted_blocks_.size() > 0) {
+            BlockSpillWriterUPtr spill_block_wirter;
+            RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                    spill_block_batch_size_, spill_block_wirter));
+
+            LOG(WARNING) << "spill to disk: build merge tree, in memory sorted blocks: "
+                         << sorted_blocks_.size();
+            if (sorted_blocks_.size() == 1) {
+                RETURN_IF_ERROR(spill_block_wirter->write(sorted_blocks_[0]));
+            } else {
+                bool eos = false;
+
+                // merge blocks in memory and write merge result to disk
+                while (!eos) {
+                    merge_sorted_block_.clear_column_data();
+                    _merge_sort_read_not_spilled(spill_block_batch_size_, &merge_sorted_block_,
+                                                 &eos);
+                    RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+                }
+            }
+            spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+            spill_block_wirter->close();
+        }
+        RETURN_IF_ERROR(_merge_spilled_blocks(sort_description));
+    }
+    return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
                                          doris::vectorized::Block* block, bool* eos) {
-    size_t num_columns = sorted_blocks[0].columns();
+    Status status = Status::OK();
+    if (is_spilled_) {
+        status = merger_->get_next(block, eos);
+        LOG(WARNING) << "spill to disk: merger_->get_next, eos: " << *eos;
+    } else {
+        LOG(WARNING) << "spill to disk: _merge_sort_read_not_spilled, eos: " << *eos;
+        if (sorted_blocks_.empty()) {
+            *eos = true;
+        } else if (sorted_blocks_.size() == 1) {
+            if (offset_ != 0) {
+                sorted_blocks_[0].skip_num_rows(offset_);
+            }
+            block->swap(sorted_blocks_[0]);
+            *eos = true;
+        } else {
+            status = _merge_sort_read_not_spilled(state->batch_size(), block, eos);

Review Comment:
   return if error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1064254237


##########
be/src/runtime/runtime_state.h:
##########
@@ -416,6 +416,18 @@ class RuntimeState {
 #endif
     }
 
+    static constexpr int64_t MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 128 << 20;

Review Comment:
   这个参数检查放到FE里,不要再BE里检查了。
   在FE 里检查有2个好处:
   1. 用户可以直接看到报错信息,知道是否生效了,在BE 上相当于报错了,但是用户不知道。
   2. fuzzy的时候,可以根据是否开启fuzzy模式,设置比较小的值就去触发spill to disk。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei merged pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei merged PR #15624:
URL: https://github.com/apache/doris/pull/15624


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062205026


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -67,13 +173,71 @@ Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
     }
 
     if (!mem_reuse) {
-        Block merge_block = sorted_blocks[0].clone_with_columns(std::move(merged_columns));
+        Block merge_block = sorted_blocks_[0].clone_with_columns(std::move(merged_columns));
         merge_block.swap(*block);
     }
 
     return Status::OK();
 }
 
+int MergeSorterState::_calc_spill_blocks_to_merge() const {
+    return config::external_sort_bytes_threshold / BLOCK_SPILL_BATCH_BYTES;
+}
+
+// merge all the intermediate spilled blocks
+Status MergeSorterState::_merge_spilled_blocks(const SortDescription& sort_description) {
+    int num_of_blocks_to_merge = _calc_spill_blocks_to_merge();
+    while (true) {
+        // pick some spilled blocks to merge, and spill the merged result
+        // to disk, until all splled blocks can be merged in a run.
+        RETURN_IF_ERROR(_create_intermediate_merger(num_of_blocks_to_merge, sort_description));
+        if (spilled_sorted_block_streams_.empty()) {
+            LOG(WARNING) << "spill to disk: _merge_spilled_blocks, can merger all spilled block in "
+                            "memory";
+            break;
+        }
+
+        bool eos = false;
+
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        while (!eos) {
+            merge_sorted_block_.clear_column_data();
+            RETURN_IF_ERROR(merger_->get_next(&merge_sorted_block_, &eos));

Review Comment:
   if return here, then spill_block_wirter is not closed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062143853


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||

Review Comment:
   If spill to disk is triggered, the sort profile should have a element to indicate this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062517667


##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -55,14 +57,21 @@
         std::vector<StorePath> paths;
         paths.emplace_back(test_data_dir, -1);
         block_spill_manager = std::make_shared<BlockSpillManager>(paths);
+        block_spill_manager->init();
     }
 
     static void TearDownTestSuite() { FileUtils::remove_all(test_data_dir); }
 
 protected:
-    void SetUp() {}
+    void SetUp() {
+        _env = ExecEnv::GetInstance();
+        _env->_block_spill_mgr = block_spill_manager.get();

Review Comment:
   warning: '_block_spill_mgr' is a private member of 'doris::ExecEnv' [clang-diagnostic-error]
   ```cpp
           _env->_block_spill_mgr = block_spill_manager.get();
                 ^
   ```
   **be/src/runtime/exec_env.h:278:** declared private here
   ```cpp
       BlockSpillManager* _block_spill_mgr = nullptr;
                          ^
   ```
   



##########
be/src/vec/core/block_spill_reader.h:
##########
@@ -25,25 +25,53 @@ namespace vectorized {
 // Read data spilled to local file.
 class BlockSpillReader {
 public:
-    BlockSpillReader(const std::string& file_path) : file_path_(file_path) {}
+    BlockSpillReader(int64_t stream_id, const std::string& file_path, bool delete_after_read = true)
+            : stream_id_(stream_id), file_path_(file_path), delete_after_read_(delete_after_read) {}
+
+    ~BlockSpillReader() { close(); }
 
     Status open();
 
-    Status close() {
-        file_reader_.reset();
-        return Status::OK();
+    Status close();
+
+    Status read(Block** block);
+
+    int64_t get_id() const { return stream_id_; }
+
+    std::string get_path() const { return file_path_; }
+
+    int64_t get_read_time(bool reset = false) {
+        auto time = read_time_;
+        if (reset) {
+            read_time_ = 0;
+        }
+        return time;
     }
 
-    // read a small block, set eof to true if no more data to read
-    Status read(Block& block, bool& eof);
+    int64_t get_deserialize_time(bool reset = false) {
+        auto time = deserialize_time_;
+        if (reset) {
+            deserialize_time_ = 0;
+        }
+        return time;
+    }
 
 private:
+    int64_t stream_id_;
     std::string file_path_;
+    bool delete_after_read_;
     io::FileReaderSPtr file_reader_;
 
     size_t block_count_ = 0;
     size_t read_block_index_ = 0;
+    size_t max_sub_block_size_ = 0;
+    std::unique_ptr<char[]> read_buff_;
     std::vector<size_t> block_start_offsets_;
+    std::unique_ptr<Block> current_block_;
+
+    size_t read_bytes_ = 0;

Review Comment:
   warning: private field 'read_bytes_' is not used [clang-diagnostic-unused-private-field]
   ```cpp
       size_t read_bytes_ = 0;
              ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -55,14 +57,21 @@ class TestBlockSpill : public testing::Test {
         std::vector<StorePath> paths;
         paths.emplace_back(test_data_dir, -1);
         block_spill_manager = std::make_shared<BlockSpillManager>(paths);
+        block_spill_manager->init();
     }
 
     static void TearDownTestSuite() { FileUtils::remove_all(test_data_dir); }
 
 protected:
-    void SetUp() {}
+    void SetUp() {

Review Comment:
   warning: annotate this function with 'override' or (rarely) 'final' [modernize-use-override]
   
   ```suggestion
       void SetUp() override {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1064358952


##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -88,31 +103,31 @@
     vectorized::Block block2({type_and_name2});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
                                                                       ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -88,31 +103,31 @@
     vectorized::Block block2({type_and_name2});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
     spill_block_writer->write(block1);
     spill_block_writer->write(block2);
     spill_block_writer->close();
 
     vectorized::BlockSpillReaderUPtr spill_block_reader;
-    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader);
-    bool eof = false;
-    vectorized::Block block_read;
+    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);
                                                                                         ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -55,14 +61,23 @@ class TestBlockSpill : public testing::Test {
         std::vector<StorePath> paths;
         paths.emplace_back(test_data_dir, -1);
         block_spill_manager = std::make_shared<BlockSpillManager>(paths);
+        block_spill_manager->init();
     }
 
     static void TearDownTestSuite() { FileUtils::remove_all(test_data_dir); }
 
 protected:
-    void SetUp() {}
+    void SetUp() {
+        env_ = ExecEnv::GetInstance();
+        env_->_block_spill_mgr = block_spill_manager.get();

Review Comment:
   warning: '_block_spill_mgr' is a private member of 'doris::ExecEnv' [clang-diagnostic-error]
   ```cpp
           env_->_block_spill_mgr = block_spill_manager.get();
                 ^
   ```
   **be/src/runtime/exec_env.h:278:** declared private here
   ```cpp
       BlockSpillManager* _block_spill_mgr = nullptr;
                          ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -137,20 +152,20 @@
     vectorized::Block block({type_and_name});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
                                                                       ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -237,22 +254,22 @@
     vectorized::Block block({type_and_name});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
     Status st = spill_block_writer->write(block);
     spill_block_writer->close();
     EXPECT_TRUE(st.ok());
 
     vectorized::BlockSpillReaderUPtr spill_block_reader;
-    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader);
-    bool eof = false;
-    vectorized::Block block_read;
+    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);
                                                                                         ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -187,32 +203,33 @@
     vectorized::Block block({test_string});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
                                                                       ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -187,32 +203,33 @@
     vectorized::Block block({test_string});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
     Status st = spill_block_writer->write(block);
     spill_block_writer->close();
     EXPECT_TRUE(st.ok());
 
     vectorized::BlockSpillReaderUPtr spill_block_reader;
-    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader);
-    bool eof = false;
-    vectorized::Block block_read;
+    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);
                                                                                         ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -294,22 +312,22 @@
     vectorized::Block block({test_decimal});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
     auto st = spill_block_writer->write(block);
     spill_block_writer->close();
     EXPECT_TRUE(st.ok());
 
     vectorized::BlockSpillReaderUPtr spill_block_reader;
-    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader);
-    bool eof = false;
-    vectorized::Block block_read;
+    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);
                                                                                         ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -237,22 +254,22 @@
     vectorized::Block block({type_and_name});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
                                                                       ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -349,22 +368,22 @@
     vectorized::Block block({type_and_name});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
                                                                       ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -413,34 +433,35 @@
     vectorized::Block block({type_and_name});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
     auto st = spill_block_writer->write(block);
     spill_block_writer->close();
     EXPECT_TRUE(st.ok());
 
     vectorized::BlockSpillReaderUPtr spill_block_reader;
-    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader);
-    bool eof = false;
-    vectorized::Block block_read;
+    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);
                                                                                         ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -349,22 +368,22 @@
     vectorized::Block block({type_and_name});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
     auto st = spill_block_writer->write(block);
     spill_block_writer->close();
     EXPECT_TRUE(st.ok());
 
     vectorized::BlockSpillReaderUPtr spill_block_reader;
-    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader);
-    bool eof = false;
-    vectorized::Block block_read;
+    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);
                                                                                         ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -294,22 +312,22 @@
     vectorized::Block block({test_decimal});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
                                                                       ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -137,20 +152,20 @@
     vectorized::Block block({type_and_name});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
     spill_block_writer->write(block);
     spill_block_writer->close();
 
     vectorized::BlockSpillReaderUPtr spill_block_reader;
-    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader);
-    bool eof = false;
-    vectorized::Block block_read;
+    block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_reader(spill_block_writer->get_id(), spill_block_reader, profile_);
                                                                                         ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



##########
be/test/vec/core/block_spill_test.cpp:
##########
@@ -413,34 +433,35 @@
     vectorized::Block block({type_and_name});
 
     vectorized::BlockSpillWriterUPtr spill_block_writer;
-    block_spill_manager->get_writer(batch_size, spill_block_writer);
+    block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);

Review Comment:
   warning: 'profile_' is a private member of 'doris::TestBlockSpill' [clang-diagnostic-error]
   ```cpp
       block_spill_manager->get_writer(batch_size, spill_block_writer, profile_);
                                                                       ^
   ```
   **be/test/vec/core/block_spill_test.cpp:79:** declared private here
   ```cpp
       RuntimeProfile* profile_;
                       ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062142518


##########
be/src/common/config.h:
##########
@@ -874,6 +874,12 @@ CONF_Int32(pipeline_executor_size, "0");
 // Will remove after fully test.
 CONF_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "false");
 
+// If the memory consumption of sort node exceed this limit, will trigger spill to disk;
+// Set to 0 to disable; defaul to 1G; min: 128M
+CONF_mInt64(external_sort_bytes_threshold, "1073741824");

Review Comment:
   The default value is 0 or -1, it means spill is disabled. If user want to enable it, it should be a value larger than 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062191883


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
+    _build_merge_tree_not_spilled(sort_description);
+
+    if (spilled_sorted_block_streams_.size() > 0) {
+        if (sorted_blocks_.size() > 0) {
+            BlockSpillWriterUPtr spill_block_wirter;
+            RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                    spill_block_batch_size_, spill_block_wirter));
+
+            LOG(WARNING) << "spill to disk: build merge tree, in memory sorted blocks: "
+                         << sorted_blocks_.size();
+            if (sorted_blocks_.size() == 1) {
+                RETURN_IF_ERROR(spill_block_wirter->write(sorted_blocks_[0]));
+            } else {
+                bool eos = false;
+
+                // merge blocks in memory and write merge result to disk
+                while (!eos) {
+                    merge_sorted_block_.clear_column_data();
+                    _merge_sort_read_not_spilled(spill_block_batch_size_, &merge_sorted_block_,
+                                                 &eos);
+                    RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+                }
+            }
+            spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+            spill_block_wirter->close();
+        }
+        RETURN_IF_ERROR(_merge_spilled_blocks(sort_description));
+    }
+    return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
                                          doris::vectorized::Block* block, bool* eos) {
-    size_t num_columns = sorted_blocks[0].columns();
+    Status status = Status::OK();
+    if (is_spilled_) {
+        status = merger_->get_next(block, eos);
+        LOG(WARNING) << "spill to disk: merger_->get_next, eos: " << *eos;

Review Comment:
   Is log warning required???



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062199064


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
+    _build_merge_tree_not_spilled(sort_description);
+
+    if (spilled_sorted_block_streams_.size() > 0) {
+        if (sorted_blocks_.size() > 0) {
+            BlockSpillWriterUPtr spill_block_wirter;
+            RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                    spill_block_batch_size_, spill_block_wirter));
+
+            LOG(WARNING) << "spill to disk: build merge tree, in memory sorted blocks: "
+                         << sorted_blocks_.size();
+            if (sorted_blocks_.size() == 1) {
+                RETURN_IF_ERROR(spill_block_wirter->write(sorted_blocks_[0]));
+            } else {
+                bool eos = false;
+
+                // merge blocks in memory and write merge result to disk
+                while (!eos) {
+                    merge_sorted_block_.clear_column_data();
+                    _merge_sort_read_not_spilled(spill_block_batch_size_, &merge_sorted_block_,
+                                                 &eos);
+                    RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+                }
+            }
+            spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+            spill_block_wirter->close();
+        }
+        RETURN_IF_ERROR(_merge_spilled_blocks(sort_description));
+    }
+    return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
                                          doris::vectorized::Block* block, bool* eos) {
-    size_t num_columns = sorted_blocks[0].columns();
+    Status status = Status::OK();
+    if (is_spilled_) {
+        status = merger_->get_next(block, eos);
+        LOG(WARNING) << "spill to disk: merger_->get_next, eos: " << *eos;
+    } else {
+        LOG(WARNING) << "spill to disk: _merge_sort_read_not_spilled, eos: " << *eos;
+        if (sorted_blocks_.empty()) {
+            *eos = true;
+        } else if (sorted_blocks_.size() == 1) {
+            if (offset_ != 0) {
+                sorted_blocks_[0].skip_num_rows(offset_);
+            }
+            block->swap(sorted_blocks_[0]);
+            *eos = true;
+        } else {
+            status = _merge_sort_read_not_spilled(state->batch_size(), block, eos);
+        }
+    }
+    return status;
+}
+
+Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
+                                                      doris::vectorized::Block* block, bool* eos) {

Review Comment:
   Does this method could return not OK status? And I find you did not check the status.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062196061


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {

Review Comment:
   Status TopNSorter::prepare_for_read() {
       _state->build_merge_tree(_sort_description);
       return Status::OK();
   
   Please check all usage of build_merge_tree and ensure that they will check the return status. I find they do not check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062206515


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -67,13 +173,71 @@ Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
     }
 
     if (!mem_reuse) {
-        Block merge_block = sorted_blocks[0].clone_with_columns(std::move(merged_columns));
+        Block merge_block = sorted_blocks_[0].clone_with_columns(std::move(merged_columns));
         merge_block.swap(*block);
     }
 
     return Status::OK();
 }
 
+int MergeSorterState::_calc_spill_blocks_to_merge() const {
+    return config::external_sort_bytes_threshold / BLOCK_SPILL_BATCH_BYTES;
+}
+
+// merge all the intermediate spilled blocks
+Status MergeSorterState::_merge_spilled_blocks(const SortDescription& sort_description) {
+    int num_of_blocks_to_merge = _calc_spill_blocks_to_merge();
+    while (true) {
+        // pick some spilled blocks to merge, and spill the merged result
+        // to disk, until all splled blocks can be merged in a run.
+        RETURN_IF_ERROR(_create_intermediate_merger(num_of_blocks_to_merge, sort_description));
+        if (spilled_sorted_block_streams_.empty()) {
+            LOG(WARNING) << "spill to disk: _merge_spilled_blocks, can merger all spilled block in "
+                            "memory";
+            break;
+        }
+
+        bool eos = false;
+
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        while (!eos) {
+            merge_sorted_block_.clear_column_data();
+            RETURN_IF_ERROR(merger_->get_next(&merge_sorted_block_, &eos));
+            RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+        }
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+        spill_block_wirter->close();
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::_create_intermediate_merger(int num_blocks,
+                                                     const SortDescription& sort_description) {
+    _reset_block_spill_readers();
+
+    std::vector<BlockSupplier> child_block_suppliers;
+    merger_.reset(new VSortedRunMerger(sort_description, spill_block_batch_size_, limit_, offset_,
+                                       profile_));
+
+    for (int i = 0; i < num_blocks && !spilled_sorted_block_streams_.empty(); ++i) {
+        auto stream_id = spilled_sorted_block_streams_.front();
+        BlockSpillReaderUPtr spilled_block_reader;

Review Comment:
   the reader is not closed? maybe we should call close automatically in deconstructor both reader and writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062051965


##########
be/src/common/config.h:
##########
@@ -874,6 +874,12 @@ CONF_Int32(pipeline_executor_size, "0");
 // Will remove after fully test.
 CONF_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "false");
 
+// If the memory consumption of sort node exceed this limit, will trigger spill to disk;
+// Set to 0 to disable; defaul to 1G; min: 128M
+CONF_mInt64(external_sort_bytes_threshold, "1073741824");

Review Comment:
   Not in be.conf, pass it by using FE's session variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062145039


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());

Review Comment:
   it is too small because the BUFFERED_BLOCK_BYTES is 16MB, there maybe a lot of small files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062192996


##########
be/src/common/config.h:
##########
@@ -874,6 +874,12 @@ CONF_Int32(pipeline_executor_size, "0");
 // Will remove after fully test.
 CONF_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "false");
 
+// If the memory consumption of sort node exceed this limit, will trigger spill to disk;
+// Set to 0 to disable; defaul to 1G; min: 128M
+CONF_mInt64(external_sort_bytes_threshold, "1073741824");

Review Comment:
   Please also add the config to fe fuzzy, we could choose the fuzzy value in (0(disabled), 1 (almost always enabled), 1MB, 100gb))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] hello-stephen commented on pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
hello-stephen commented on PR #15624:
URL: https://github.com/apache/doris/pull/15624#issuecomment-1372597735

   TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 36.27 seconds
    load time: 468 seconds
    storage size: 17121638989 Bytes
    https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/tmp/20230105184339_clickbench_pr_74457.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062178611


##########
be/src/runtime/block_spill_manager.cpp:
##########
@@ -21,16 +21,28 @@
 #include <boost/uuid/uuid_io.hpp>
 #include <random>
 
+#include "util/file_utils.h"
+
 namespace doris {
+static const std::string BLOCK_SPILL_TMP_DIR = "spill";
 BlockSpillManager::BlockSpillManager(const std::vector<StorePath>& paths) : _store_paths(paths) {}
 
+Status BlockSpillManager::init() {
+    for (const auto& path : _store_paths) {
+        std::string dir = path.path + "/" + BLOCK_SPILL_TMP_DIR;
+        if (!FileUtils::check_exist(dir)) {
+            RETURN_IF_ERROR(FileUtils::create_dir(dir));
+        }
+    }
+    return Status::OK();
+}
 Status BlockSpillManager::get_writer(int32_t batch_size, vectorized::BlockSpillWriterUPtr& writer) {
     int64_t id;
     std::vector<int> indices(_store_paths.size());
     std::iota(indices.begin(), indices.end(), 0);
     std::shuffle(indices.begin(), indices.end(), std::mt19937 {std::random_device {}()});
 
-    std::string path = _store_paths[indices[0]].path;
+    std::string path = _store_paths[indices[0]].path + "/" + BLOCK_SPILL_TMP_DIR;

Review Comment:
   There will be some garbage if be restart during spill to disk.
   so that we could add a timestamp prefix here.
   Then  add a gc logic to clear all folders before current timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062199064


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
+    _build_merge_tree_not_spilled(sort_description);
+
+    if (spilled_sorted_block_streams_.size() > 0) {
+        if (sorted_blocks_.size() > 0) {
+            BlockSpillWriterUPtr spill_block_wirter;
+            RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                    spill_block_batch_size_, spill_block_wirter));
+
+            LOG(WARNING) << "spill to disk: build merge tree, in memory sorted blocks: "
+                         << sorted_blocks_.size();
+            if (sorted_blocks_.size() == 1) {
+                RETURN_IF_ERROR(spill_block_wirter->write(sorted_blocks_[0]));
+            } else {
+                bool eos = false;
+
+                // merge blocks in memory and write merge result to disk
+                while (!eos) {
+                    merge_sorted_block_.clear_column_data();
+                    _merge_sort_read_not_spilled(spill_block_batch_size_, &merge_sorted_block_,
+                                                 &eos);
+                    RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+                }
+            }
+            spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+            spill_block_wirter->close();
+        }
+        RETURN_IF_ERROR(_merge_spilled_blocks(sort_description));
+    }
+    return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
                                          doris::vectorized::Block* block, bool* eos) {
-    size_t num_columns = sorted_blocks[0].columns();
+    Status status = Status::OK();
+    if (is_spilled_) {
+        status = merger_->get_next(block, eos);
+        LOG(WARNING) << "spill to disk: merger_->get_next, eos: " << *eos;
+    } else {
+        LOG(WARNING) << "spill to disk: _merge_sort_read_not_spilled, eos: " << *eos;
+        if (sorted_blocks_.empty()) {
+            *eos = true;
+        } else if (sorted_blocks_.size() == 1) {
+            if (offset_ != 0) {
+                sorted_blocks_[0].skip_num_rows(offset_);
+            }
+            block->swap(sorted_blocks_[0]);
+            *eos = true;
+        } else {
+            status = _merge_sort_read_not_spilled(state->batch_size(), block, eos);
+        }
+    }
+    return status;
+}
+
+Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
+                                                      doris::vectorized::Block* block, bool* eos) {

Review Comment:
   Does this method could return not OK status? And I find you did not check the status in some callers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062206515


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -67,13 +173,71 @@ Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
     }
 
     if (!mem_reuse) {
-        Block merge_block = sorted_blocks[0].clone_with_columns(std::move(merged_columns));
+        Block merge_block = sorted_blocks_[0].clone_with_columns(std::move(merged_columns));
         merge_block.swap(*block);
     }
 
     return Status::OK();
 }
 
+int MergeSorterState::_calc_spill_blocks_to_merge() const {
+    return config::external_sort_bytes_threshold / BLOCK_SPILL_BATCH_BYTES;
+}
+
+// merge all the intermediate spilled blocks
+Status MergeSorterState::_merge_spilled_blocks(const SortDescription& sort_description) {
+    int num_of_blocks_to_merge = _calc_spill_blocks_to_merge();
+    while (true) {
+        // pick some spilled blocks to merge, and spill the merged result
+        // to disk, until all splled blocks can be merged in a run.
+        RETURN_IF_ERROR(_create_intermediate_merger(num_of_blocks_to_merge, sort_description));
+        if (spilled_sorted_block_streams_.empty()) {
+            LOG(WARNING) << "spill to disk: _merge_spilled_blocks, can merger all spilled block in "
+                            "memory";
+            break;
+        }
+
+        bool eos = false;
+
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        while (!eos) {
+            merge_sorted_block_.clear_column_data();
+            RETURN_IF_ERROR(merger_->get_next(&merge_sorted_block_, &eos));
+            RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+        }
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+        spill_block_wirter->close();
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::_create_intermediate_merger(int num_blocks,
+                                                     const SortDescription& sort_description) {
+    _reset_block_spill_readers();
+
+    std::vector<BlockSupplier> child_block_suppliers;
+    merger_.reset(new VSortedRunMerger(sort_description, spill_block_batch_size_, limit_, offset_,
+                                       profile_));
+
+    for (int i = 0; i < num_blocks && !spilled_sorted_block_streams_.empty(); ++i) {
+        auto stream_id = spilled_sorted_block_streams_.front();
+        BlockSpillReaderUPtr spilled_block_reader;

Review Comment:
   the reader is not closed? maybe we should call close automatically in deconstructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062216903


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
+    _build_merge_tree_not_spilled(sort_description);
+
+    if (spilled_sorted_block_streams_.size() > 0) {
+        if (sorted_blocks_.size() > 0) {
+            BlockSpillWriterUPtr spill_block_wirter;
+            RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                    spill_block_batch_size_, spill_block_wirter));
+
+            LOG(WARNING) << "spill to disk: build merge tree, in memory sorted blocks: "
+                         << sorted_blocks_.size();
+            if (sorted_blocks_.size() == 1) {
+                RETURN_IF_ERROR(spill_block_wirter->write(sorted_blocks_[0]));
+            } else {
+                bool eos = false;
+
+                // merge blocks in memory and write merge result to disk
+                while (!eos) {
+                    merge_sorted_block_.clear_column_data();
+                    _merge_sort_read_not_spilled(spill_block_batch_size_, &merge_sorted_block_,
+                                                 &eos);
+                    RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));

Review Comment:
   the writer is not closed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062144227


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));

Review Comment:
   writer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1064252944


##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -665,6 +672,21 @@ public void initFuzzyModeVariables() {
         this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 : 1048576;
         this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
         this.rewriteOrToInPredicateThreshold = random.nextInt(100) + 2;
+        int randomInt = random.nextInt(4);
+        switch (randomInt) {
+            case 0:
+                this.externalSortBytesThreshold = 0;
+                break;
+            case 1:
+                this.externalSortBytesThreshold = 1;
+                break;
+            case 2:
+                this.externalSortBytesThreshold = 1024 * 1024;

Review Comment:
   这个fuzzy 触发不了啊,BE 上小于128MB的不生效,然后这里得100G才有,实际流水线就触发不了了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1061467388


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));

Review Comment:
   warning: statement should be inside braces [readability-braces-around-statements]
   
   ```suggestion
           for (auto& cursor : cursors_) { priority_queue_.push(MergeSortCursor(&cursor));
   }
   ```
   



##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
+    _build_merge_tree_not_spilled(sort_description);
+
+    if (spilled_sorted_block_streams_.size() > 0) {
+        if (sorted_blocks_.size() > 0) {
+            BlockSpillWriterUPtr spill_block_wirter;
+            RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                    spill_block_batch_size_, spill_block_wirter));
+
+            LOG(WARNING) << "spill to disk: build merge tree, in memory sorted blocks: "
+                         << sorted_blocks_.size();
+            if (sorted_blocks_.size() == 1) {
+                RETURN_IF_ERROR(spill_block_wirter->write(sorted_blocks_[0]));
+            } else {
+                bool eos = false;
+
+                // merge blocks in memory and write merge result to disk
+                while (!eos) {
+                    merge_sorted_block_.clear_column_data();
+                    _merge_sort_read_not_spilled(spill_block_batch_size_, &merge_sorted_block_,
+                                                 &eos);
+                    RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+                }
+            }
+            spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+            spill_block_wirter->close();
+        }
+        RETURN_IF_ERROR(_merge_spilled_blocks(sort_description));
+    }
+    return Status::OK();
 }
 
 Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
                                          doris::vectorized::Block* block, bool* eos) {
-    size_t num_columns = sorted_blocks[0].columns();
+    Status status = Status::OK();
+    if (is_spilled_) {
+        status = merger_->get_next(block, eos);
+        LOG(WARNING) << "spill to disk: merger_->get_next, eos: " << *eos;
+    } else {
+        LOG(WARNING) << "spill to disk: _merge_sort_read_not_spilled, eos: " << *eos;
+        if (sorted_blocks_.empty()) {
+            *eos = true;
+        } else if (sorted_blocks_.size() == 1) {
+            if (offset_ != 0) {
+                sorted_blocks_[0].skip_num_rows(offset_);
+            }
+            block->swap(sorted_blocks_[0]);
+            *eos = true;
+        } else {
+            status = _merge_sort_read_not_spilled(state->batch_size(), block, eos);
+        }
+    }
+    return status;
+}
+
+Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
+                                                      doris::vectorized::Block* block, bool* eos) {
+    size_t num_columns = sorted_blocks_[0].columns();
 
     bool mem_reuse = block->mem_reuse();
     MutableColumns merged_columns =
-            mem_reuse ? block->mutate_columns() : sorted_blocks[0].clone_empty_columns();
+            mem_reuse ? block->mutate_columns() : sorted_blocks_[0].clone_empty_columns();
 
     /// Take rows from queue in right order and push to 'merged'.
     size_t merged_rows = 0;
-    while (!priority_queue.empty()) {
-        auto current = priority_queue.top();
-        priority_queue.pop();
+    while (!priority_queue_.empty()) {
+        auto current = priority_queue_.top();
+        priority_queue_.pop();
 
-        if (_offset == 0) {
+        if (offset_ == 0) {
             for (size_t i = 0; i < num_columns; ++i)
                 merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
             ++merged_rows;
         } else {
-            _offset--;
+            offset_--;
         }
 
         if (!current->isLast()) {
             current->next();
-            priority_queue.push(current);
+            priority_queue_.push(current);
         }
 
-        if (merged_rows == state->batch_size()) break;
+        if (merged_rows == batch_size) break;

Review Comment:
   warning: statement should be inside braces [readability-braces-around-statements]
   
   ```suggestion
           if (merged_rows == batch_size) { break;
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei commented on a diff in pull request #15624: [feature](spill to disk) Support spill to disk for sort node

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062186323


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -17,48 +17,154 @@
 
 #include "vec/common/sort/sorter.h"
 
+#include "runtime/block_spill_manager.h"
 #include "runtime/thread_context.h"
 
 namespace doris::vectorized {
 
-void MergeSorterState::build_merge_tree(SortDescription& sort_description) {
-    for (const auto& block : sorted_blocks) {
-        cursors.emplace_back(block, sort_description);
+// When doing spillable sorting, each sorted block is spilled into a single file.
+//
+// In order to decrease memory pressure when merging
+// multiple spilled blocks into one bigger sorted block, only part
+// of each spilled blocks are read back into memory at a time.
+//
+// Currently the spilled blocks are splitted into small sub blocks,
+// each sub block is serialized in PBlock format and appended
+// to the spill file.
+//
+// This number specifies the maximum size of sub blocks
+static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
+
+Status MergeSorterState::add_sorted_block(Block& block) {
+    auto rows = block.rows();
+    if (0 == rows) {
+        return Status::OK();
+    }
+    if (0 == avg_row_bytes_) {
+        avg_row_bytes_ = block.bytes() / rows;
+        spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_;
     }
 
-    if (sorted_blocks.size() > 1) {
-        for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor));
+    auto bytes_used = data_size();
+    if (is_spilled_ ||
+        (config::external_sort_bytes_threshold > 0 &&
+         (bytes_used + block.allocated_bytes()) >=
+                 (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) {
+        is_spilled_ = true;
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        RETURN_IF_ERROR(spill_block_wirter->write(block));
+        spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+
+        spill_block_wirter->close();
+
+        if (init_merge_sorted_block_) {
+            init_merge_sorted_block_ = false;
+            merge_sorted_block_ = block.clone_empty();
+        }
+    } else {
+        sorted_blocks_.emplace_back(std::move(block));
     }
+    num_rows_ += rows;
+    return Status::OK();
+}
+
+Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) {
+    for (const auto& block : sorted_blocks_) {
+        cursors_.emplace_back(block, sort_description);
+    }
+
+    if (sorted_blocks_.size() > 1) {
+        for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor));
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
+    _build_merge_tree_not_spilled(sort_description);

Review Comment:
   if (spilled_sorted_block_streams_.size() == 0) {
       _build_merge_tree_not_spilled(sort_description);
   } else {
       xxxxx;
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org