You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "jacktengg (via GitHub)" <gi...@apache.org> on 2023/06/21 01:47:00 UTC

[GitHub] [doris] jacktengg opened a new pull request, #21052: [feature](spill) support spill to disk for pipeline engine

jacktengg opened a new pull request, #21052:
URL: https://github.com/apache/doris/pull/21052

   ## Proposed changes
   
   Issue Number: close #xxx
   
   <!--Describe your changes.-->
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #21052: [feature](spill) support spill to disk for pipeline engine

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #21052:
URL: https://github.com/apache/doris/pull/21052#issuecomment-1599951080

   clang-tidy review says "All clean, LGTM! :+1:"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](spill) support spill to disk for pipeline engine [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #21052:
URL: https://github.com/apache/doris/pull/21052#discussion_r1415148483


##########
be/src/exec/data_sink.h:
##########
@@ -98,6 +98,10 @@ class DataSink {
         _query_statistics = statistics;
     }
 
+    virtual size_t revokable_mem_size() const { return 0; }

Review Comment:
   warning: function 'revokable_mem_size' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] virtual size_t revokable_mem_size() const { return 0; }
   ```
   



##########
be/src/exec/exec_node.h:
##########
@@ -21,6 +21,7 @@
 #pragma once
 
 #include <gen_cpp/PlanNodes_types.h>

Review Comment:
   warning: 'gen_cpp/PlanNodes_types.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/PlanNodes_types.h>
            ^
   ```
   



##########
be/src/exec/exec_node.h:
##########
@@ -196,6 +197,10 @@
 
     virtual void prepare_for_next() {}
 
+    virtual size_t revokable_mem_size() const { return 0; }

Review Comment:
   warning: function 'revokable_mem_size' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] virtual size_t revokable_mem_size() const { return 0; }
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -297,6 +303,10 @@
 
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); }
 
+    size_t revokable_mem_size() const override { return _sink->revokable_mem_size(); }

Review Comment:
   warning: function 'revokable_mem_size' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
   [[nodiscard]] 
   ```
   



##########
be/src/pipeline/exec/hashjoin_build_sink.h:
##########
@@ -42,5 +43,20 @@ class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilde
     bool is_pending_finish() const override { return !_node->ready_for_finish(); }
 };
 
+class GraceHashJoinBuildSinkBuilder final : public OperatorBuilder<vectorized::GraceHashJoinNode> {
+public:
+    GraceHashJoinBuildSinkBuilder(int32_t, ExecNode*);
+
+    OperatorPtr build_operator() override;
+    bool is_sink() const override { return true; }

Review Comment:
   warning: function 'is_sink' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_sink() const override { return true; }
   ```
   



##########
be/src/pipeline/exec/empty_source_operator.h:
##########
@@ -82,6 +82,10 @@ class EmptySourceOperator final : public OperatorBase {
         return _exec_node->runtime_profile();
     }
 
+    size_t revokable_mem_size() const override { return 0; }

Review Comment:
   warning: function 'revokable_mem_size' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] size_t revokable_mem_size() const override { return 0; }
   ```
   



##########
be/src/runtime/runtime_state.h:
##########
@@ -413,6 +413,25 @@
         return _query_options.__isset.enable_insert_strict && _query_options.enable_insert_strict;
     }
 
+    bool enable_join_spill() const {
+        return _query_options.__isset.enable_join_spill && _query_options.enable_join_spill;
+    }
+
+    bool enable_sort_spill() const {

Review Comment:
   warning: function 'enable_sort_spill' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool enable_sort_spill() const {
   ```
   



##########
be/src/runtime/runtime_state.h:
##########
@@ -413,6 +413,25 @@
         return _query_options.__isset.enable_insert_strict && _query_options.enable_insert_strict;
     }
 
+    bool enable_join_spill() const {
+        return _query_options.__isset.enable_join_spill && _query_options.enable_join_spill;
+    }
+
+    bool enable_sort_spill() const {
+        return _query_options.__isset.enable_sort_spill && _query_options.enable_sort_spill;
+    }
+
+    bool enable_agg_spill() const {
+        return _query_options.__isset.enable_agg_spill && _query_options.enable_agg_spill;
+    }
+
+    int64_t min_revokable_mem() const {

Review Comment:
   warning: function 'min_revokable_mem' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] int64_t min_revokable_mem() const {
   ```
   



##########
be/src/vec/exec/join/grace_hash_join_node.h:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "util/countdown_latch.h"
+#include "vec/exec/join/vhash_join_node.h"
+#include "vec/spill/spill_stream.h"
+
+namespace doris {
+namespace vectorized {
+
+using InMemoryHashJoinNodeUPtr = std::unique_ptr<HashJoinNode>;
+
+class GraceHashJoinNode;
+
+class JoinPartition {
+public:
+    JoinPartition(GraceHashJoinNode* parent, int level) : parent_(parent), level_(level) {}
+    Status prepare(RuntimeState* state, RuntimeProfile* profile, const std::string& operator_name,
+                   int node_id);
+
+    Status add_build_rows(Block* block, const std::vector<int>& rows, bool eos);
+
+    Status add_probe_rows(RuntimeState* state, Block* block, std::vector<int>& rows, bool eos);
+
+    Status push_probe_block(RuntimeState* state, Block* input_block, bool eos);
+
+    Status probe(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+        return in_mem_hash_join_node_->pull(state, output_block, eos);
+    }
+
+    Status build_eos() { return build_stream_->done_write(); }
+
+    Status probe_eos() { return probe_stream_->done_write(); }
+
+    Status spilled_probe_not_repartitioned(RuntimeState* state, Block* output_block, bool* eos) {
+        bool partition_eos = false;
+        if (need_more_probe_data()) {
+            Block block;
+            RETURN_IF_ERROR(probe_stream_->get_next(&block, &partition_eos));
+            RETURN_IF_ERROR(in_mem_hash_join_node_->push(state, &block, partition_eos));
+        }
+        return probe(state, output_block, eos);
+    }
+
+    Status get_next_probe_block(RuntimeState* state, Block* output_block, bool* eos) {
+        return probe_stream_->get_next(output_block, eos);
+    }
+    bool has_nex_probe_block() { return probe_stream_->has_next(); }
+
+    Status unpin_build_stream();
+
+    Status unpin_probe_stream();
+
+    bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }
+
+    bool is_probe_partition_spilled() const { return probe_stream_->is_spilled(); }
+
+    bool is_ready_for_probe() const { return is_ready_for_probe_; }

Review Comment:
   warning: function 'is_ready_for_probe' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_ready_for_probe() const { return is_ready_for_probe_; }
   ```
   



##########
be/src/vec/exec/join/grace_hash_join_node.h:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "util/countdown_latch.h"
+#include "vec/exec/join/vhash_join_node.h"
+#include "vec/spill/spill_stream.h"
+
+namespace doris {
+namespace vectorized {
+
+using InMemoryHashJoinNodeUPtr = std::unique_ptr<HashJoinNode>;
+
+class GraceHashJoinNode;
+
+class JoinPartition {
+public:
+    JoinPartition(GraceHashJoinNode* parent, int level) : parent_(parent), level_(level) {}
+    Status prepare(RuntimeState* state, RuntimeProfile* profile, const std::string& operator_name,
+                   int node_id);
+
+    Status add_build_rows(Block* block, const std::vector<int>& rows, bool eos);
+
+    Status add_probe_rows(RuntimeState* state, Block* block, std::vector<int>& rows, bool eos);
+
+    Status push_probe_block(RuntimeState* state, Block* input_block, bool eos);
+
+    Status probe(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+        return in_mem_hash_join_node_->pull(state, output_block, eos);
+    }
+
+    Status build_eos() { return build_stream_->done_write(); }
+
+    Status probe_eos() { return probe_stream_->done_write(); }
+
+    Status spilled_probe_not_repartitioned(RuntimeState* state, Block* output_block, bool* eos) {
+        bool partition_eos = false;
+        if (need_more_probe_data()) {
+            Block block;
+            RETURN_IF_ERROR(probe_stream_->get_next(&block, &partition_eos));
+            RETURN_IF_ERROR(in_mem_hash_join_node_->push(state, &block, partition_eos));
+        }
+        return probe(state, output_block, eos);
+    }
+
+    Status get_next_probe_block(RuntimeState* state, Block* output_block, bool* eos) {
+        return probe_stream_->get_next(output_block, eos);
+    }
+    bool has_nex_probe_block() { return probe_stream_->has_next(); }
+
+    Status unpin_build_stream();
+
+    Status unpin_probe_stream();
+
+    bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }
+
+    bool is_probe_partition_spilled() const { return probe_stream_->is_spilled(); }

Review Comment:
   warning: function 'is_probe_partition_spilled' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_probe_partition_spilled() const { return probe_stream_->is_spilled(); }
   ```
   



##########
be/src/vec/common/sort/sorter.h:
##########
@@ -88,7 +91,7 @@ class MergeSorterState {
 
     bool is_spilled() const { return is_spilled_; }
 
-    const Block& last_sorted_block() const { return sorted_blocks_.back(); }
+    Block last_sorted_block() const { return sorted_blocks_.back(); }

Review Comment:
   warning: function 'last_sorted_block' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] Block last_sorted_block() const { return sorted_blocks_.back(); }
   ```
   



##########
be/src/io/fs/local_file_writer.h:
##########
@@ -40,6 +40,8 @@ class LocalFileWriter final : public FileWriter {
     Status write_at(size_t offset, const Slice& data) override;
     Status finalize() override;
 
+    int get_fd() const { return _fd; }

Review Comment:
   warning: function 'get_fd' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] int get_fd() const { return _fd; }
   ```
   



##########
be/src/pipeline/exec/spill_sort_sink_operator.h:
##########
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "operator.h"
+#include "vec/exec/spill_sort_node.h"
+
+namespace doris {
+class ExecNode;
+
+namespace pipeline {
+
+class SpillSortSinkOperatorBuilder final : public OperatorBuilder<vectorized::SpillSortNode> {
+public:
+    SpillSortSinkOperatorBuilder(int32_t id, ExecNode* sort_node);
+
+    bool is_sink() const override { return true; }

Review Comment:
   warning: function 'is_sink' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_sink() const override { return true; }
   ```
   



##########
be/src/runtime/runtime_state.h:
##########
@@ -413,6 +413,25 @@ class RuntimeState {
         return _query_options.__isset.enable_insert_strict && _query_options.enable_insert_strict;
     }
 
+    bool enable_join_spill() const {

Review Comment:
   warning: function 'enable_join_spill' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool enable_join_spill() const {
   ```
   



##########
be/src/pipeline/exec/spill_sort_source_operator.h:
##########
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/exec/spill_sort_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace pipeline {
+
+class SpillSortSourceOperatorBuilder final : public OperatorBuilder<vectorized::SpillSortNode> {
+public:
+    SpillSortSourceOperatorBuilder(int32_t id, ExecNode* sort_node);
+
+    bool is_source() const override { return true; }

Review Comment:
   warning: function 'is_source' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_source() const override { return true; }
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -244,6 +246,10 @@ class OperatorBase {
 
     [[nodiscard]] virtual RuntimeProfile* get_runtime_profile() const = 0;
 
+    virtual size_t revokable_mem_size() const { return 0; }

Review Comment:
   warning: function 'revokable_mem_size' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
   [[nodiscard]] 
   ```
   



##########
be/src/runtime/runtime_state.h:
##########
@@ -413,6 +413,25 @@
         return _query_options.__isset.enable_insert_strict && _query_options.enable_insert_strict;
     }
 
+    bool enable_join_spill() const {
+        return _query_options.__isset.enable_join_spill && _query_options.enable_join_spill;
+    }
+
+    bool enable_sort_spill() const {
+        return _query_options.__isset.enable_sort_spill && _query_options.enable_sort_spill;
+    }
+
+    bool enable_agg_spill() const {

Review Comment:
   warning: function 'enable_agg_spill' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool enable_agg_spill() const {
   ```
   



##########
be/src/vec/exec/join/grace_hash_join_node.h:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "util/countdown_latch.h"
+#include "vec/exec/join/vhash_join_node.h"
+#include "vec/spill/spill_stream.h"
+
+namespace doris {
+namespace vectorized {
+
+using InMemoryHashJoinNodeUPtr = std::unique_ptr<HashJoinNode>;
+
+class GraceHashJoinNode;
+
+class JoinPartition {
+public:
+    JoinPartition(GraceHashJoinNode* parent, int level) : parent_(parent), level_(level) {}
+    Status prepare(RuntimeState* state, RuntimeProfile* profile, const std::string& operator_name,
+                   int node_id);
+
+    Status add_build_rows(Block* block, const std::vector<int>& rows, bool eos);
+
+    Status add_probe_rows(RuntimeState* state, Block* block, std::vector<int>& rows, bool eos);
+
+    Status push_probe_block(RuntimeState* state, Block* input_block, bool eos);
+
+    Status probe(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+        return in_mem_hash_join_node_->pull(state, output_block, eos);
+    }
+
+    Status build_eos() { return build_stream_->done_write(); }
+
+    Status probe_eos() { return probe_stream_->done_write(); }
+
+    Status spilled_probe_not_repartitioned(RuntimeState* state, Block* output_block, bool* eos) {
+        bool partition_eos = false;
+        if (need_more_probe_data()) {
+            Block block;
+            RETURN_IF_ERROR(probe_stream_->get_next(&block, &partition_eos));
+            RETURN_IF_ERROR(in_mem_hash_join_node_->push(state, &block, partition_eos));
+        }
+        return probe(state, output_block, eos);
+    }
+
+    Status get_next_probe_block(RuntimeState* state, Block* output_block, bool* eos) {
+        return probe_stream_->get_next(output_block, eos);
+    }
+    bool has_nex_probe_block() { return probe_stream_->has_next(); }
+
+    Status unpin_build_stream();
+
+    Status unpin_probe_stream();
+
+    bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }

Review Comment:
   warning: function 'is_build_partition_spilled' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -364,6 +374,10 @@
         return _node->runtime_profile();
     }
 
+    size_t revokable_mem_size() const override { return _node->revokable_mem_size(); }

Review Comment:
   warning: function 'revokable_mem_size' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
   [[nodiscard]] 
   ```
   



##########
be/src/vec/common/sort/sorter.h:
##########
@@ -161,10 +164,20 @@
 
     virtual bool is_spilled() const { return false; }
 
+    bool is_append_block_oom() const { return _is_append_block_oom; }

Review Comment:
   warning: function 'is_append_block_oom' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_append_block_oom() const { return _is_append_block_oom; }
   ```
   



##########
be/src/vec/exec/join/grace_hash_join_node.h:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "util/countdown_latch.h"
+#include "vec/exec/join/vhash_join_node.h"
+#include "vec/spill/spill_stream.h"
+
+namespace doris {
+namespace vectorized {
+
+using InMemoryHashJoinNodeUPtr = std::unique_ptr<HashJoinNode>;
+
+class GraceHashJoinNode;
+
+class JoinPartition {
+public:
+    JoinPartition(GraceHashJoinNode* parent, int level) : parent_(parent), level_(level) {}
+    Status prepare(RuntimeState* state, RuntimeProfile* profile, const std::string& operator_name,
+                   int node_id);
+
+    Status add_build_rows(Block* block, const std::vector<int>& rows, bool eos);
+
+    Status add_probe_rows(RuntimeState* state, Block* block, std::vector<int>& rows, bool eos);
+
+    Status push_probe_block(RuntimeState* state, Block* input_block, bool eos);
+
+    Status probe(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+        return in_mem_hash_join_node_->pull(state, output_block, eos);
+    }
+
+    Status build_eos() { return build_stream_->done_write(); }
+
+    Status probe_eos() { return probe_stream_->done_write(); }
+
+    Status spilled_probe_not_repartitioned(RuntimeState* state, Block* output_block, bool* eos) {
+        bool partition_eos = false;
+        if (need_more_probe_data()) {
+            Block block;
+            RETURN_IF_ERROR(probe_stream_->get_next(&block, &partition_eos));
+            RETURN_IF_ERROR(in_mem_hash_join_node_->push(state, &block, partition_eos));
+        }
+        return probe(state, output_block, eos);
+    }
+
+    Status get_next_probe_block(RuntimeState* state, Block* output_block, bool* eos) {
+        return probe_stream_->get_next(output_block, eos);
+    }
+    bool has_nex_probe_block() { return probe_stream_->has_next(); }
+
+    Status unpin_build_stream();
+
+    Status unpin_probe_stream();
+
+    bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }
+
+    bool is_probe_partition_spilled() const { return probe_stream_->is_spilled(); }
+
+    bool is_ready_for_probe() const { return is_ready_for_probe_; }
+
+    bool need_more_probe_data() const { return in_mem_hash_join_node_->need_more_input_data(); }
+
+    bool current_probe_finished() const { return in_mem_hash_join_node_->current_probe_finished(); }

Review Comment:
   warning: function 'current_probe_finished' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool current_probe_finished() const { return in_mem_hash_join_node_->current_probe_finished(); }
   ```
   



##########
be/src/vec/exec/join/grace_hash_join_node.h:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "util/countdown_latch.h"
+#include "vec/exec/join/vhash_join_node.h"
+#include "vec/spill/spill_stream.h"
+
+namespace doris {
+namespace vectorized {
+
+using InMemoryHashJoinNodeUPtr = std::unique_ptr<HashJoinNode>;
+
+class GraceHashJoinNode;
+
+class JoinPartition {
+public:
+    JoinPartition(GraceHashJoinNode* parent, int level) : parent_(parent), level_(level) {}
+    Status prepare(RuntimeState* state, RuntimeProfile* profile, const std::string& operator_name,
+                   int node_id);
+
+    Status add_build_rows(Block* block, const std::vector<int>& rows, bool eos);
+
+    Status add_probe_rows(RuntimeState* state, Block* block, std::vector<int>& rows, bool eos);
+
+    Status push_probe_block(RuntimeState* state, Block* input_block, bool eos);
+
+    Status probe(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+        return in_mem_hash_join_node_->pull(state, output_block, eos);
+    }
+
+    Status build_eos() { return build_stream_->done_write(); }
+
+    Status probe_eos() { return probe_stream_->done_write(); }
+
+    Status spilled_probe_not_repartitioned(RuntimeState* state, Block* output_block, bool* eos) {
+        bool partition_eos = false;
+        if (need_more_probe_data()) {
+            Block block;
+            RETURN_IF_ERROR(probe_stream_->get_next(&block, &partition_eos));
+            RETURN_IF_ERROR(in_mem_hash_join_node_->push(state, &block, partition_eos));
+        }
+        return probe(state, output_block, eos);
+    }
+
+    Status get_next_probe_block(RuntimeState* state, Block* output_block, bool* eos) {
+        return probe_stream_->get_next(output_block, eos);
+    }
+    bool has_nex_probe_block() { return probe_stream_->has_next(); }
+
+    Status unpin_build_stream();
+
+    Status unpin_probe_stream();
+
+    bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }
+
+    bool is_probe_partition_spilled() const { return probe_stream_->is_spilled(); }
+
+    bool is_ready_for_probe() const { return is_ready_for_probe_; }
+
+    bool need_more_probe_data() const { return in_mem_hash_join_node_->need_more_input_data(); }
+
+    bool current_probe_finished() const { return in_mem_hash_join_node_->current_probe_finished(); }
+
+    bool is_processed() const { return is_processed_; }

Review Comment:
   warning: function 'is_processed' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_processed() const { return is_processed_; }
   ```
   



##########
be/src/vec/exec/join/grace_hash_join_node.h:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "util/countdown_latch.h"
+#include "vec/exec/join/vhash_join_node.h"
+#include "vec/spill/spill_stream.h"
+
+namespace doris {
+namespace vectorized {
+
+using InMemoryHashJoinNodeUPtr = std::unique_ptr<HashJoinNode>;
+
+class GraceHashJoinNode;
+
+class JoinPartition {
+public:
+    JoinPartition(GraceHashJoinNode* parent, int level) : parent_(parent), level_(level) {}
+    Status prepare(RuntimeState* state, RuntimeProfile* profile, const std::string& operator_name,
+                   int node_id);
+
+    Status add_build_rows(Block* block, const std::vector<int>& rows, bool eos);
+
+    Status add_probe_rows(RuntimeState* state, Block* block, std::vector<int>& rows, bool eos);
+
+    Status push_probe_block(RuntimeState* state, Block* input_block, bool eos);
+
+    Status probe(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+        return in_mem_hash_join_node_->pull(state, output_block, eos);
+    }
+
+    Status build_eos() { return build_stream_->done_write(); }
+
+    Status probe_eos() { return probe_stream_->done_write(); }
+
+    Status spilled_probe_not_repartitioned(RuntimeState* state, Block* output_block, bool* eos) {
+        bool partition_eos = false;
+        if (need_more_probe_data()) {
+            Block block;
+            RETURN_IF_ERROR(probe_stream_->get_next(&block, &partition_eos));
+            RETURN_IF_ERROR(in_mem_hash_join_node_->push(state, &block, partition_eos));
+        }
+        return probe(state, output_block, eos);
+    }
+
+    Status get_next_probe_block(RuntimeState* state, Block* output_block, bool* eos) {
+        return probe_stream_->get_next(output_block, eos);
+    }
+    bool has_nex_probe_block() { return probe_stream_->has_next(); }
+
+    Status unpin_build_stream();
+
+    Status unpin_probe_stream();
+
+    bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }
+
+    bool is_probe_partition_spilled() const { return probe_stream_->is_spilled(); }
+
+    bool is_ready_for_probe() const { return is_ready_for_probe_; }
+
+    bool need_more_probe_data() const { return in_mem_hash_join_node_->need_more_input_data(); }

Review Comment:
   warning: function 'need_more_probe_data' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool need_more_probe_data() const { return in_mem_hash_join_node_->need_more_input_data(); }
   ```
   



##########
be/src/vec/exec/join/grace_hash_join_node.h:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "util/countdown_latch.h"
+#include "vec/exec/join/vhash_join_node.h"
+#include "vec/spill/spill_stream.h"
+
+namespace doris {
+namespace vectorized {
+
+using InMemoryHashJoinNodeUPtr = std::unique_ptr<HashJoinNode>;
+
+class GraceHashJoinNode;
+
+class JoinPartition {
+public:
+    JoinPartition(GraceHashJoinNode* parent, int level) : parent_(parent), level_(level) {}
+    Status prepare(RuntimeState* state, RuntimeProfile* profile, const std::string& operator_name,
+                   int node_id);
+
+    Status add_build_rows(Block* block, const std::vector<int>& rows, bool eos);
+
+    Status add_probe_rows(RuntimeState* state, Block* block, std::vector<int>& rows, bool eos);
+
+    Status push_probe_block(RuntimeState* state, Block* input_block, bool eos);
+
+    Status probe(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+        return in_mem_hash_join_node_->pull(state, output_block, eos);
+    }
+
+    Status build_eos() { return build_stream_->done_write(); }
+
+    Status probe_eos() { return probe_stream_->done_write(); }
+
+    Status spilled_probe_not_repartitioned(RuntimeState* state, Block* output_block, bool* eos) {
+        bool partition_eos = false;
+        if (need_more_probe_data()) {
+            Block block;
+            RETURN_IF_ERROR(probe_stream_->get_next(&block, &partition_eos));
+            RETURN_IF_ERROR(in_mem_hash_join_node_->push(state, &block, partition_eos));
+        }
+        return probe(state, output_block, eos);
+    }
+
+    Status get_next_probe_block(RuntimeState* state, Block* output_block, bool* eos) {
+        return probe_stream_->get_next(output_block, eos);
+    }
+    bool has_nex_probe_block() { return probe_stream_->has_next(); }
+
+    Status unpin_build_stream();
+
+    Status unpin_probe_stream();
+
+    bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }
+
+    bool is_probe_partition_spilled() const { return probe_stream_->is_spilled(); }
+
+    bool is_ready_for_probe() const { return is_ready_for_probe_; }
+
+    bool need_more_probe_data() const { return in_mem_hash_join_node_->need_more_input_data(); }
+
+    bool current_probe_finished() const { return in_mem_hash_join_node_->current_probe_finished(); }
+
+    bool is_processed() const { return is_processed_; }
+    void set_is_processed() { is_processed_ = true; }
+
+    void close(RuntimeState* state) {
+        close_build(state);
+        close_probe();
+    }
+    void close_build(RuntimeState* state);
+    void close_probe();
+
+    size_t build_data_bytes() const { return build_data_bytes_; }

Review Comment:
   warning: function 'build_data_bytes' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] size_t build_data_bytes() const { return build_data_bytes_; }
   ```
   



##########
be/src/vec/exec/join/grace_hash_join_node.h:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "util/countdown_latch.h"
+#include "vec/exec/join/vhash_join_node.h"
+#include "vec/spill/spill_stream.h"
+
+namespace doris {
+namespace vectorized {
+
+using InMemoryHashJoinNodeUPtr = std::unique_ptr<HashJoinNode>;
+
+class GraceHashJoinNode;
+
+class JoinPartition {
+public:
+    JoinPartition(GraceHashJoinNode* parent, int level) : parent_(parent), level_(level) {}
+    Status prepare(RuntimeState* state, RuntimeProfile* profile, const std::string& operator_name,
+                   int node_id);
+
+    Status add_build_rows(Block* block, const std::vector<int>& rows, bool eos);
+
+    Status add_probe_rows(RuntimeState* state, Block* block, std::vector<int>& rows, bool eos);
+
+    Status push_probe_block(RuntimeState* state, Block* input_block, bool eos);
+
+    Status probe(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+        return in_mem_hash_join_node_->pull(state, output_block, eos);
+    }
+
+    Status build_eos() { return build_stream_->done_write(); }
+
+    Status probe_eos() { return probe_stream_->done_write(); }
+
+    Status spilled_probe_not_repartitioned(RuntimeState* state, Block* output_block, bool* eos) {
+        bool partition_eos = false;
+        if (need_more_probe_data()) {
+            Block block;
+            RETURN_IF_ERROR(probe_stream_->get_next(&block, &partition_eos));
+            RETURN_IF_ERROR(in_mem_hash_join_node_->push(state, &block, partition_eos));
+        }
+        return probe(state, output_block, eos);
+    }
+
+    Status get_next_probe_block(RuntimeState* state, Block* output_block, bool* eos) {
+        return probe_stream_->get_next(output_block, eos);
+    }
+    bool has_nex_probe_block() { return probe_stream_->has_next(); }
+
+    Status unpin_build_stream();
+
+    Status unpin_probe_stream();
+
+    bool is_build_partition_spilled() const { return build_stream_->is_spilled(); }
+
+    bool is_probe_partition_spilled() const { return probe_stream_->is_spilled(); }
+
+    bool is_ready_for_probe() const { return is_ready_for_probe_; }
+
+    bool need_more_probe_data() const { return in_mem_hash_join_node_->need_more_input_data(); }
+
+    bool current_probe_finished() const { return in_mem_hash_join_node_->current_probe_finished(); }
+
+    bool is_processed() const { return is_processed_; }
+    void set_is_processed() { is_processed_ = true; }
+
+    void close(RuntimeState* state) {
+        close_build(state);
+        close_probe();
+    }
+    void close_build(RuntimeState* state);
+    void close_probe();
+
+    size_t build_data_bytes() const { return build_data_bytes_; }
+    size_t probe_data_bytes() const { return probe_data_bytes_; }

Review Comment:
   warning: function 'probe_data_bytes' should be marked [[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] size_t probe_data_bytes() const { return probe_data_bytes_; }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org