You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/12/03 06:53:05 UTC

[GitHub] [doris] HappenLee opened a new pull request, #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

HappenLee opened a new pull request, #14787:
URL: https://github.com/apache/doris/pull/14787

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem summary
   
   Describe your changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: 
       - [ ] Yes
       - [ ] No
       - [ ] I don't know
   2. Has unit tests been added:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   3. Has document been added or modified:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   4. Does it need to update dependencies:
       - [ ] Yes
       - [ ] No
   5. Are there any changes that cannot be rolled back:
       - [ ] Yes (If Yes, please explain WHY)
       - [ ] No
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038950310


##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +169,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
+                             SourceState& result_state) {

Review Comment:
   warning: parameter 'runtime_state' is unused [misc-unused-parameters]
   
   ```suggestion
   ipeline /*runtime_state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +169,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
+                             SourceState& result_state) {
+        return Status::OK();

Review Comment:
   warning: parameter 'block' is unused [misc-unused-parameters]
   
   ```suggestion
     /*block*/,
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   fault;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
    /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,119 @@ enum class SourceState : uint8_t {
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
+public:
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
+            : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+
+    const RowDescriptor& row_desc() override { return _node->row_desc(); }
+
+    NodeType* exec_node() const { return _node; }
+
+protected:
+    NodeType* _node;
+};
+
+template <typename SinkType>
+class DataSinkOperatorBuilder : public OperatorBuilderBase {
+public:
+    DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr)
+            : OperatorBuilderBase(id, name), _sink(reinterpret_cast<SinkType*>(sink)) {}
+
+

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   fault;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
   fault; /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   fault;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   )) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +169,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
+                             SourceState& result_state) {
+        return Status::OK();
+    };

Review Comment:
   warning: parameter 'result_state' is unused [misc-unused-parameters]
   
   ```suggestion
    block,
   tate) { /*result_state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
    /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   E) {};
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038741810


##########
be/src/pipeline/exec/operator.cpp:
##########
@@ -19,53 +19,30 @@
 
 namespace doris::pipeline {
 
-Operator::Operator(OperatorBuilder* operator_builder)
+OperatorBase::OperatorBase(OperatorBuilderBase* operator_builder)
         : _operator_builder(operator_builder), _is_closed(false) {}
 
-bool Operator::is_sink() const {
+bool OperatorBase::is_sink() const {
     return _operator_builder->is_sink();
 }
 
-bool Operator::is_source() const {
+bool OperatorBase::is_source() const {
     return _operator_builder->is_source();
 }
 
-Status Operator::init(ExecNode* exec_node, RuntimeState* state) {
-    _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
-    if (exec_node) {
-        exec_node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
-    }
-    return Status::OK();
-}
-
-Status Operator::prepare(RuntimeState* state) {
-    _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
-                                                _runtime_profile.get());
-    return Status::OK();
-}
-
-Status Operator::open(RuntimeState* state) {
-    return Status::OK();
-}
-
-Status Operator::close(RuntimeState* state) {
+Status OperatorBase::close(RuntimeState* state) {

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
   Status OperatorBase::close(RuntimeState*  /*state*/) {
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +171,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
+                             SourceState& result_state) {
+        return Status::OK();

Review Comment:
   warning: parameter 'result_state' is unused [misc-unused-parameters]
   
   ```suggestion
    block,
   tate) { /*result_state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +171,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
+                             SourceState& result_state) {

Review Comment:
   warning: parameter 'block' is unused [misc-unused-parameters]
   
   ```suggestion
     /*block*/,
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,121 @@
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
+public:
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
+            : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+    virtual ~OperatorBuilder() override = default;
+
+    const RowDescriptor& row_desc() override { return _node->row_desc(); }
+
+    NodeType* exec_node() const { return _node; }
+
+protected:
+    NodeType* _node;
+};
+
+template <typename SinkType>
+class DataSinkOperatorBuilder : public OperatorBuilderBase {
+public:
+    DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr)
+            : OperatorBuilderBase(id, name), _sink(reinterpret_cast<SinkType*>(sink)) {}
+
+    virtual ~DataSinkOperatorBuilder() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,121 @@ enum class SourceState : uint8_t {
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
+public:
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
+            : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+    virtual ~OperatorBuilder() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.cpp:
##########
@@ -75,13 +52,13 @@
 
 /////////////////////////////////////// OperatorBuilder ////////////////////////////////////////////////////////////
 
-Status OperatorBuilder::prepare(doris::RuntimeState* state) {
+Status OperatorBuilderBase::prepare(doris::RuntimeState* state) {
     _state = state;
     // runtime filter, now dispose by NewOlapScanNode
     return Status::OK();
 }
 
-void OperatorBuilder::close(doris::RuntimeState* state) {
+void OperatorBuilderBase::close(doris::RuntimeState* state) {

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
   void OperatorBuilderBase::close(doris::RuntimeState*  /*state*/) {
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   fault;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   fault;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
    /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
   fault; /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
    /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   K(); }
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   )) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +171,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,

Review Comment:
   warning: parameter 'runtime_state' is unused [misc-unused-parameters]
   
   ```suggestion
   ipeline /*runtime_state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   )) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+    virtual ~DataStateOperator() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   E) {};
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee merged pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
HappenLee merged PR #14787:
URL: https://github.com/apache/doris/pull/14787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] BiteTheDDDDt commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1039097244


##########
tools/ssb-tools/conf/doris-cluster.conf:
##########
@@ -26,4 +26,4 @@ export USER='root'
 # Doris password
 export PASSWORD=''
 # The database where SSB tables located
-export DB='ssb'
+export DB='regression_test_load_p0_stream_load'

Review Comment:
   need reset



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] qzsee commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
qzsee commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038901162


##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -22,46 +22,7 @@
 namespace doris {
 namespace pipeline {
 
-AggregationSourceOperator::AggregationSourceOperator(OperatorBuilder* templ,
-                                                     vectorized::AggregationNode* node)
-        : Operator(templ), _agg_node(node) {}
-
-Status AggregationSourceOperator::prepare(RuntimeState* state) {
-    _agg_node->increase_ref();
-    return Status::OK();
-}
-
-bool AggregationSourceOperator::can_read() {
-    return _agg_node->can_read();
-}
-
-Status AggregationSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
-                                            SourceState& source_state) {
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    bool eos = false;
-    RETURN_IF_ERROR(_agg_node->pull(state, block, &eos));
-    source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
-    return Status::OK();
-}
-
-Status AggregationSourceOperator::close(RuntimeState* state) {
-    _fresh_exec_timer(_agg_node);
-    if (!_agg_node->decrease_ref()) {
-        _agg_node->release_resource(state);
-    }
-    return Status::OK();
-}
-
-///////////////////////////////  operator template  ////////////////////////////////
-
-AggregationSourceOperatorBuilder::AggregationSourceOperatorBuilder(
-        int32_t id, const std::string& name, vectorized::AggregationNode* exec_node)
-        : OperatorBuilder(id, name, exec_node) {}
-
-OperatorPtr AggregationSourceOperatorBuilder::build_operator() {
-    return std::make_shared<AggregationSourceOperator>(
-            this, assert_cast<vectorized::AggregationNode*>(_related_exec_node));
-}
+OPERATOR_CODE_GENERATOR(AggregationSourceOperator, Operator)

Review Comment:
   AggregationSourceOperator -> AggSourceOperator ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038950124


##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   E) {};
   ```
   



##########
be/src/vec/exec/scan/vscan_node.h:
##########
@@ -49,6 +49,8 @@ class VScanNode : public ExecNode {
 public:
     VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
             : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {}
+    virtual ~VScanNode() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override]
   
   ```suggestion
       ~VScanNode() override = default;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   fault;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@ class Operator {
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   )) {};
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038948914


##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -22,46 +22,7 @@
 namespace doris {
 namespace pipeline {
 
-AggregationSourceOperator::AggregationSourceOperator(OperatorBuilder* templ,
-                                                     vectorized::AggregationNode* node)
-        : Operator(templ), _agg_node(node) {}
-
-Status AggregationSourceOperator::prepare(RuntimeState* state) {
-    _agg_node->increase_ref();
-    return Status::OK();
-}
-
-bool AggregationSourceOperator::can_read() {
-    return _agg_node->can_read();
-}
-
-Status AggregationSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
-                                            SourceState& source_state) {
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    bool eos = false;
-    RETURN_IF_ERROR(_agg_node->pull(state, block, &eos));
-    source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
-    return Status::OK();
-}
-
-Status AggregationSourceOperator::close(RuntimeState* state) {
-    _fresh_exec_timer(_agg_node);
-    if (!_agg_node->decrease_ref()) {
-        _agg_node->release_resource(state);
-    }
-    return Status::OK();
-}
-
-///////////////////////////////  operator template  ////////////////////////////////
-
-AggregationSourceOperatorBuilder::AggregationSourceOperatorBuilder(
-        int32_t id, const std::string& name, vectorized::AggregationNode* exec_node)
-        : OperatorBuilder(id, name, exec_node) {}
-
-OperatorPtr AggregationSourceOperatorBuilder::build_operator() {
-    return std::make_shared<AggregationSourceOperator>(
-            this, assert_cast<vectorized::AggregationNode*>(_related_exec_node));
-}
+OPERATOR_CODE_GENERATOR(AggregationSourceOperator, Operator)

Review Comment:
   No, still `AggregationSourceOperator `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038950499


##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   )) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@ class Operator {
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: expected member name or ';' after declaration specifiers [clang-diagnostic-error]
   ```cpp
   )) {};
   ^
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: unknown type name 'E' [clang-diagnostic-error]
   ```cpp
   E) {};
   ^
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   E) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +220,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+)) {};
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+E) {};
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: expected member name or ';' after declaration specifiers [clang-diagnostic-error]
   ```cpp
   E) {};
    ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] hello-stephen commented on pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
hello-stephen commented on PR #14787:
URL: https://github.com/apache/doris/pull/14787#issuecomment-1336102815

   TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 34.75 seconds
    load time: 429 seconds
    storage size: 17123356367 Bytes
    https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/tmp/20221203071945_clickbench_pr_57158.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038954502


##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,121 @@
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
+public:
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
+            : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+    virtual ~OperatorBuilder() = default;
+
+    const RowDescriptor& row_desc() override { return _node->row_desc(); }
+
+    NodeType* exec_node() const { return _node; }
+
+protected:
+    NodeType* _node;
+};
+
+template <typename SinkType>
+class DataSinkOperatorBuilder : public OperatorBuilderBase {
+public:
+    DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr)
+            : OperatorBuilderBase(id, name), _sink(reinterpret_cast<SinkType*>(sink)) {}
+
+    virtual ~DataSinkOperatorBuilder() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override]
   
   ```suggestion
   override 
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override]
   
   ```suggestion
   )) {};override 
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    NodeType* _sink;
+};
 
-    int32_t id() const { return _id; }
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
+
+    virtual ~Operator() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override]
   
   ```suggestion
   )) {};override 
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,121 @@ enum class SourceState : uint8_t {
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
+public:
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
+            : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+    virtual ~OperatorBuilder() = default;

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override]
   
   ```suggestion
   override 
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] qzsee commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

Posted by GitBox <gi...@apache.org>.
qzsee commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038963578


##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -22,46 +22,7 @@
 namespace doris {
 namespace pipeline {
 
-AggregationSourceOperator::AggregationSourceOperator(OperatorBuilder* templ,
-                                                     vectorized::AggregationNode* node)
-        : Operator(templ), _agg_node(node) {}
-
-Status AggregationSourceOperator::prepare(RuntimeState* state) {
-    _agg_node->increase_ref();
-    return Status::OK();
-}
-
-bool AggregationSourceOperator::can_read() {
-    return _agg_node->can_read();
-}
-
-Status AggregationSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
-                                            SourceState& source_state) {
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    bool eos = false;
-    RETURN_IF_ERROR(_agg_node->pull(state, block, &eos));
-    source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
-    return Status::OK();
-}
-
-Status AggregationSourceOperator::close(RuntimeState* state) {
-    _fresh_exec_timer(_agg_node);
-    if (!_agg_node->decrease_ref()) {
-        _agg_node->release_resource(state);
-    }
-    return Status::OK();
-}
-
-///////////////////////////////  operator template  ////////////////////////////////
-
-AggregationSourceOperatorBuilder::AggregationSourceOperatorBuilder(
-        int32_t id, const std::string& name, vectorized::AggregationNode* exec_node)
-        : OperatorBuilder(id, name, exec_node) {}
-
-OperatorPtr AggregationSourceOperatorBuilder::build_operator() {
-    return std::make_shared<AggregationSourceOperator>(
-            this, assert_cast<vectorized::AggregationNode*>(_related_exec_node));
-}
+OPERATOR_CODE_GENERATOR(AggregationSourceOperator, Operator)

Review Comment:
   why  is`AggSinkOperator ` instead of `AggregationSinkOperator `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org