You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/07 16:13:02 UTC
[doris] 08/13: [Bug](memleak) Fix emptyoperator may cause node not close (#20525)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4654fe0a748db8696ab7b8d8774290f1b55fec90
Author: wangbo <wa...@apache.org>
AuthorDate: Wed Jun 7 01:27:13 2023 +0800
[Bug](memleak) Fix emptyoperator may cause node not close (#20525)
---
be/src/pipeline/exec/empty_source_operator.cpp | 2 +-
be/src/pipeline/exec/empty_source_operator.h | 19 ++++++++++++++++---
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
3 files changed, 18 insertions(+), 5 deletions(-)
diff --git a/be/src/pipeline/exec/empty_source_operator.cpp b/be/src/pipeline/exec/empty_source_operator.cpp
index 5142c0c55a..78f5c94662 100644
--- a/be/src/pipeline/exec/empty_source_operator.cpp
+++ b/be/src/pipeline/exec/empty_source_operator.cpp
@@ -21,7 +21,7 @@
namespace doris::pipeline {
OperatorPtr EmptySourceOperatorBuilder::build_operator() {
- return std::make_shared<EmptySourceOperator>(this);
+ return std::make_shared<EmptySourceOperator>(this, _exec_node);
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/empty_source_operator.h b/be/src/pipeline/exec/empty_source_operator.h
index fd12b27dab..4d93a310df 100644
--- a/be/src/pipeline/exec/empty_source_operator.h
+++ b/be/src/pipeline/exec/empty_source_operator.h
@@ -37,8 +37,10 @@ namespace doris::pipeline {
class EmptySourceOperatorBuilder final : public OperatorBuilderBase {
public:
- EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor)
- : OperatorBuilderBase(id, "EmptySourceOperator"), _row_descriptor(row_descriptor) {}
+ EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor, ExecNode* exec_node)
+ : OperatorBuilderBase(id, "EmptySourceOperator"),
+ _row_descriptor(row_descriptor),
+ _exec_node(exec_node) {}
bool is_source() const override { return true; }
@@ -48,11 +50,14 @@ public:
private:
RowDescriptor _row_descriptor;
+ ExecNode* _exec_node = nullptr;
};
class EmptySourceOperator final : public OperatorBase {
public:
- EmptySourceOperator(OperatorBuilderBase* builder) : OperatorBase(builder) {}
+ EmptySourceOperator(OperatorBuilderBase* builder, ExecNode* exec_node)
+ : OperatorBase(builder), _exec_node(exec_node) {}
+
bool can_read() override { return true; }
bool is_pending_finish() const override { return false; }
@@ -67,6 +72,14 @@ public:
}
Status sink(RuntimeState*, vectorized::Block*, SourceState) override { return Status::OK(); }
+
+ Status close(RuntimeState* state) override {
+ _exec_node->close(state);
+ return Status::OK();
+ }
+
+private:
+ ExecNode* _exec_node = nullptr;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 32207ad216..ebb30fe384 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -596,7 +596,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
} else {
OperatorBuilderPtr builder = std::make_shared<EmptySourceOperatorBuilder>(
- next_operator_builder_id(), node->child(1)->row_desc());
+ next_operator_builder_id(), node->child(1)->row_desc(), node->child(1));
new_pipe->add_operator(builder);
}
OperatorBuilderPtr join_sink =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org