You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/12/03 07:07:17 UTC

[GitHub] [doris] github-actions[bot] commented on a diff in pull request #14787: [Refactor](pipeline) Refactor operator and builder code of pipeline

github-actions[bot] commented on code in PR #14787:
URL: https://github.com/apache/doris/pull/14787#discussion_r1038741810


##########
be/src/pipeline/exec/operator.cpp:
##########
@@ -19,53 +19,30 @@
 
 namespace doris::pipeline {
 
-Operator::Operator(OperatorBuilder* operator_builder)
+OperatorBase::OperatorBase(OperatorBuilderBase* operator_builder)
         : _operator_builder(operator_builder), _is_closed(false) {}
 
-bool Operator::is_sink() const {
+bool OperatorBase::is_sink() const {
     return _operator_builder->is_sink();
 }
 
-bool Operator::is_source() const {
+bool OperatorBase::is_source() const {
     return _operator_builder->is_source();
 }
 
-Status Operator::init(ExecNode* exec_node, RuntimeState* state) {
-    _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
-    if (exec_node) {
-        exec_node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
-    }
-    return Status::OK();
-}
-
-Status Operator::prepare(RuntimeState* state) {
-    _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
-                                                _runtime_profile.get());
-    return Status::OK();
-}
-
-Status Operator::open(RuntimeState* state) {
-    return Status::OK();
-}
-
-Status Operator::close(RuntimeState* state) {
+Status OperatorBase::close(RuntimeState* state) {

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
   Status OperatorBase::close(RuntimeState*  /*state*/) {
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +171,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
+                             SourceState& result_state) {
+        return Status::OK();

Review Comment:
   warning: parameter 'result_state' is unused [misc-unused-parameters]
   
   ```suggestion
    block,
   tate) { /*result_state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +171,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
+                             SourceState& result_state) {

Review Comment:
   warning: parameter 'block' is unused [misc-unused-parameters]
   
   ```suggestion
     /*block*/,
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,121 @@
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
+public:
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
+            : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+    virtual ~OperatorBuilder() override = default;
+
+    const RowDescriptor& row_desc() override { return _node->row_desc(); }
+
+    NodeType* exec_node() const { return _node; }
+
+protected:
+    NodeType* _node;
+};
+
+template <typename SinkType>
+class DataSinkOperatorBuilder : public OperatorBuilderBase {
+public:
+    DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr)
+            : OperatorBuilderBase(id, name), _sink(reinterpret_cast<SinkType*>(sink)) {}
+
+    virtual ~DataSinkOperatorBuilder() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -33,55 +42,121 @@ enum class SourceState : uint8_t {
     FINISHED = 2
 };
 
-//
 enum class SinkState : uint8_t {
     SINK_IDLE = 0, // can send block to sink
     SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block
     FINISHED = 2
 };
 ////////////////       DO NOT USE THE UP State     ////////////////
 
-class OperatorBuilder;
-class Operator;
+class OperatorBuilderBase;
+class OperatorBase;
 
-using OperatorPtr = std::shared_ptr<Operator>;
+using OperatorPtr = std::shared_ptr<OperatorBase>;
 using Operators = std::vector<OperatorPtr>;
 
-class Operator {
+using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>;
+using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+
+class OperatorBuilderBase {
+public:
+    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {}
+
+    virtual ~OperatorBuilderBase() = default;
+
+    virtual OperatorPtr build_operator() = 0;
+
+    virtual bool is_sink() const { return false; }
+    virtual bool is_source() const { return false; }
+
+    // create the object used by all operator
+    virtual Status prepare(RuntimeState* state);
+
+    // destory the object used by all operator
+    virtual void close(RuntimeState* state);
+
+    std::string get_name() const { return _name; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    virtual const RowDescriptor& row_desc() = 0;
+
+    int32_t id() const { return _id; }
+
+protected:
+    const int32_t _id;
+    const std::string _name;
+
+    RuntimeState* _state = nullptr;
+    bool _is_closed = false;
+};
+
+template <typename NodeType>
+class OperatorBuilder : public OperatorBuilderBase {
 public:
-    explicit Operator(OperatorBuilder* operator_builder);
-    virtual ~Operator() = default;
+    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
+            : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
+
+    virtual ~OperatorBuilder() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.cpp:
##########
@@ -75,13 +52,13 @@
 
 /////////////////////////////////////// OperatorBuilder ////////////////////////////////////////////////////////////
 
-Status OperatorBuilder::prepare(doris::RuntimeState* state) {
+Status OperatorBuilderBase::prepare(doris::RuntimeState* state) {
     _state = state;
     // runtime filter, now dispose by NewOlapScanNode
     return Status::OK();
 }
 
-void OperatorBuilder::close(doris::RuntimeState* state) {
+void OperatorBuilderBase::close(doris::RuntimeState* state) {

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
   void OperatorBuilderBase::close(doris::RuntimeState*  /*state*/) {
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   fault;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   fault;
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
    /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
   fault; /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }

Review Comment:
   warning: parameter 'state' is unused [misc-unused-parameters]
   
   ```suggestion
    /*state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   K(); }
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   )) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -96,20 +171,14 @@
     virtual bool can_write() { return false; } // for sink
 
     // for pipeline
-    virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state,
-                             [[maybe_unused]] vectorized::Block* block,
-                             [[maybe_unused]] SourceState& result_state) {
-        std::stringstream error_msg;
-        error_msg << " has not implements get_block";
-        return Status::NotSupported(error_msg.str());
-    }
+    virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block,

Review Comment:
   warning: parameter 'runtime_state' is unused [misc-unused-parameters]
   
   ```suggestion
   ipeline /*runtime_state*/
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   )) {};
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   
   ```
   



##########
be/src/pipeline/exec/operator.h:
##########
@@ -155,44 +222,163 @@
     bool _is_closed = false;
 };
 
-class OperatorBuilder {
+template <typename OperatorBuilderType>
+class DataSinkOperator : public OperatorBase {
 public:
-    OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
-            : _id(id), _name(name), _related_exec_node(exec_node) {}
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    virtual ~OperatorBuilder() = default;
+    DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
+            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
 
-    virtual OperatorPtr build_operator() = 0;
+    virtual ~DataSinkOperator() override = default;
 
-    virtual bool is_sink() const { return false; }
-    virtual bool is_source() const { return false; }
+    virtual Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(_sink->prepare(state));
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _sink->profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        return Status::OK();
+    }
 
-    // create the object used by all operator
-    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _sink->open(state);
+    }
 
-    // destory the object used by all operator
-    virtual void close(RuntimeState* state);
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        if (!UNLIKELY(in_block)) {
+            DCHECK(source_state == SourceState::FINISHED)
+                    << "block is null, eos should invoke in finalize.";
+            return Status::OK();
+        }
+        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+    }
 
-    std::string get_name() const { return _name; }
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_sink);
+        return _sink->close(state, Status::OK());
+    }
 
-    RuntimeState* runtime_state() { return _state; }
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+protected:
+    void _fresh_exec_timer(NodeType* node) {
+        node->profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); }
+    NodeType* _sink;
+};
+
+template <typename OperatorBuilderType>
+class Operator : public OperatorBase {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    ExecNode* exec_node() const { return _related_exec_node; }
+    Operator(OperatorBuilderBase* builder, ExecNode* node)
+            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    int32_t id() const { return _id; }
+    virtual ~Operator() override = default;
+
+    virtual Status prepare(RuntimeState* state) override {
+        _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
+        _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
+        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+                                                    _runtime_profile.get());
+        _node->increase_ref();
+        return Status::OK();
+    }
+
+    virtual Status open(RuntimeState* state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        RETURN_IF_ERROR(_node->alloc_resource(state));
+        return Status::OK();
+    }
+
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
+                        SourceState source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        return _node->sink(state, in_block, source_state == SourceState::FINISHED);
+    }
+
+    virtual Status close(RuntimeState* state) override {
+        _fresh_exec_timer(_node);
+        if (!_node->decrease_ref()) {
+            _node->release_resource(state);
+        }
+        return Status::OK();
+    }
+
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state) override {
+        SCOPED_TIMER(_runtime_profile->total_time_counter());
+        bool eos = false;
+        RETURN_IF_ERROR(_node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+
+    virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
+
+    virtual bool can_read() override { return _node->can_read(); }
 
 protected:
-    const int32_t _id;
-    const std::string _name;
-    ExecNode* _related_exec_node;
+    void _fresh_exec_timer(NodeType* node) {
+        node->runtime_profile()->total_time_counter()->update(
+                _runtime_profile->total_time_counter()->value());
+    }
 
-    RuntimeState* _state = nullptr;
-    bool _is_closed = false;
+    NodeType* _node;
 };
 
-using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>;
-using OperatorBuilders = std::vector<OperatorBuilderPtr>;
+template <typename OperatorBuilderType>
+class DataStateOperator : public Operator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    DataStateOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : Operator<OperatorBuilderType>(builder, node),
+              _child_block(new vectorized::Block),
+              _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
+
+    virtual ~DataStateOperator() override = default;

Review Comment:
   warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override]
   
   ```suggestion
   E) {};
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org