You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/11 13:31:40 UTC

[doris] branch master updated: [Pipeline](select node) Support select node on pipeline engine (#14928)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fb695b51d [Pipeline](select node) Support select node on pipeline engine (#14928)
7fb695b51d is described below

commit 7fb695b51d0c21bf9265b9458736a304286f1765
Author: Gabriel <ga...@gmail.com>
AuthorDate: Sun Dec 11 21:31:32 2022 +0800

    [Pipeline](select node) Support select node on pipeline engine (#14928)
---
 be/src/pipeline/CMakeLists.txt                     |  3 +-
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  2 +-
 be/src/pipeline/exec/aggregation_sink_operator.h   |  2 +-
 .../pipeline/exec/aggregation_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/aggregation_source_operator.h |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  2 +-
 be/src/pipeline/exec/analytic_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/analytic_source_operator.h    |  2 +-
 be/src/pipeline/exec/assert_num_rows_operator.h    |  4 +-
 be/src/pipeline/exec/const_value_operator.h        |  4 +-
 be/src/pipeline/exec/datagen_operator.cpp          |  6 +--
 be/src/pipeline/exec/datagen_operator.h            |  2 +-
 be/src/pipeline/exec/empty_set_operator.cpp        |  2 +-
 be/src/pipeline/exec/empty_set_operator.h          |  2 +-
 be/src/pipeline/exec/exchange_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/exchange_source_operator.h    |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |  2 +-
 be/src/pipeline/exec/mysql_scan_operator.cpp       |  6 +--
 be/src/pipeline/exec/mysql_scan_operator.h         |  2 +-
 .../exec/nested_loop_join_build_operator.cpp       |  2 +-
 .../exec/nested_loop_join_build_operator.h         |  2 +-
 be/src/pipeline/exec/operator.h                    | 54 ++++++++++++++++++----
 be/src/pipeline/exec/scan_operator.cpp             |  6 +--
 be/src/pipeline/exec/scan_operator.h               |  2 +-
 be/src/pipeline/exec/schema_scan_operator.cpp      |  4 +-
 be/src/pipeline/exec/schema_scan_operator.h        |  2 +-
 ...lytic_sink_operator.cpp => select_operator.cpp} |  7 +--
 .../{hashjoin_build_sink.h => select_operator.h}   | 15 +++---
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  2 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |  2 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  2 +-
 be/src/pipeline/exec/set_sink_operator.h           |  2 +-
 be/src/pipeline/exec/set_source_operator.cpp       |  2 +-
 be/src/pipeline/exec/set_source_operator.h         |  2 +-
 be/src/pipeline/exec/sort_sink_operator.cpp        |  2 +-
 be/src/pipeline/exec/sort_sink_operator.h          |  2 +-
 be/src/pipeline/exec/sort_source_operator.cpp      |  2 +-
 be/src/pipeline/exec/sort_source_operator.h        |  2 +-
 .../exec/streaming_aggregation_sink_operator.cpp   |  8 ++--
 .../exec/streaming_aggregation_sink_operator.h     |  2 +-
 .../exec/streaming_aggregation_source_operator.cpp |  2 +-
 .../exec/streaming_aggregation_source_operator.h   |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  8 ++++
 be/src/vec/exec/vselect_node.cpp                   | 12 ++++-
 be/src/vec/exec/vselect_node.h                     | 15 +++---
 47 files changed, 134 insertions(+), 84 deletions(-)

diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt
index bca5816c2b..31531fcbc5 100644
--- a/be/src/pipeline/CMakeLists.txt
+++ b/be/src/pipeline/CMakeLists.txt
@@ -53,7 +53,8 @@ set(PIPELINE_FILES
         exec/nested_loop_join_probe_operator.cpp
         exec/set_sink_operator.cpp
         exec/set_source_operator.cpp
-        exec/set_probe_sink_operator.cpp)
+        exec/set_probe_sink_operator.cpp
+        exec/select_operator.cpp)
 
 add_library(Pipeline STATIC
         ${PIPELINE_FILES}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index ae1205ce1a..ce53e4d6a4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -21,6 +21,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(AggSinkOperator, Operator)
+OPERATOR_CODE_GENERATOR(AggSinkOperator, StreamingOperator)
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h
index 21d9ad7d20..7b5c4ee2f6 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -37,7 +37,7 @@ public:
     bool is_sink() const override { return true; };
 };
 
-class AggSinkOperator final : public Operator<AggSinkOperatorBuilder> {
+class AggSinkOperator final : public StreamingOperator<AggSinkOperatorBuilder> {
 public:
     AggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
     bool can_write() override { return true; };
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp
index daecafdd0e..e3b2aa1f23 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -22,7 +22,7 @@
 namespace doris {
 namespace pipeline {
 
-OPERATOR_CODE_GENERATOR(AggSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(AggSourceOperator, SourceOperator)
 
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h
index bd3cbd9cd4..04c28abd40 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -34,7 +34,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class AggSourceOperator final : public Operator<AggSourceOperatorBuilder> {
+class AggSourceOperator final : public SourceOperator<AggSourceOperatorBuilder> {
 public:
     AggSourceOperator(OperatorBuilderBase*, ExecNode*);
     // if exec node split to: sink, source operator. the source operator
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 0e06e517d9..4a17359ea7 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -20,6 +20,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, Operator)
+OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator)
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h
index b330b37637..c786ffb9d7 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -36,7 +36,7 @@ public:
     bool is_sink() const override { return true; };
 };
 
-class AnalyticSinkOperator final : public Operator<AnalyticSinkOperatorBuilder> {
+class AnalyticSinkOperator final : public StreamingOperator<AnalyticSinkOperatorBuilder> {
 public:
     AnalyticSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
 
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp
index 7f0194a62f..9c9717d24a 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -19,6 +19,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(AnalyticSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(AnalyticSourceOperator, SourceOperator)
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h
index 0d25bb850b..34fd9cdfde 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -37,7 +37,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class AnalyticSourceOperator final : public Operator<AnalyticSourceOperatorBuilder> {
+class AnalyticSourceOperator final : public SourceOperator<AnalyticSourceOperatorBuilder> {
 public:
     AnalyticSourceOperator(OperatorBuilderBase*, ExecNode*);
 
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h
index 60bbc7c49c..40acfd05f8 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -32,10 +32,10 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class AssertNumRowsOperator final : public Operator<AssertNumRowsOperatorBuilder> {
+class AssertNumRowsOperator final : public StreamingOperator<AssertNumRowsOperatorBuilder> {
 public:
     AssertNumRowsOperator(OperatorBuilderBase* operator_builder, ExecNode* node)
-            : Operator(operator_builder, node) {};
+            : StreamingOperator(operator_builder, node) {};
 };
 
 OperatorPtr AssertNumRowsOperatorBuilder::build_operator() {
diff --git a/be/src/pipeline/exec/const_value_operator.h b/be/src/pipeline/exec/const_value_operator.h
index 838005d2f1..467f5ded05 100644
--- a/be/src/pipeline/exec/const_value_operator.h
+++ b/be/src/pipeline/exec/const_value_operator.h
@@ -36,10 +36,10 @@ public:
     bool is_source() const override { return true; }
 };
 
-class ConstValueOperator final : public Operator<ConstValueOperatorBuilder> {
+class ConstValueOperator final : public SourceOperator<ConstValueOperatorBuilder> {
 public:
     ConstValueOperator(OperatorBuilderBase* operator_builder, ExecNode* node)
-            : Operator(operator_builder, node) {};
+            : SourceOperator(operator_builder, node) {};
 
     bool can_read() override { return true; };
 };
diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp
index 729096f803..fc4a32342c 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -21,16 +21,16 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(DataGenOperator, Operator)
+OPERATOR_CODE_GENERATOR(DataGenOperator, SourceOperator)
 
 Status DataGenOperator::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(Operator::open(state));
+    RETURN_IF_ERROR(SourceOperator::open(state));
     return _node->open(state);
 }
 
 Status DataGenOperator::close(RuntimeState* state) {
-    RETURN_IF_ERROR(Operator::close(state));
+    RETURN_IF_ERROR(SourceOperator::close(state));
     _node->close(state);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h
index 799fa3c776..27933caf26 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -32,7 +32,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class DataGenOperator : public Operator<DataGenOperatorBuilder> {
+class DataGenOperator : public SourceOperator<DataGenOperatorBuilder> {
 public:
     DataGenOperator(OperatorBuilderBase* operator_builder, ExecNode* datagen_node);
 
diff --git a/be/src/pipeline/exec/empty_set_operator.cpp b/be/src/pipeline/exec/empty_set_operator.cpp
index 2f607080d5..be1178e9a6 100644
--- a/be/src/pipeline/exec/empty_set_operator.cpp
+++ b/be/src/pipeline/exec/empty_set_operator.cpp
@@ -21,6 +21,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(EmptySetSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(EmptySetSourceOperator, SourceOperator)
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/empty_set_operator.h b/be/src/pipeline/exec/empty_set_operator.h
index 8cffcd2783..f0dc39e1fd 100644
--- a/be/src/pipeline/exec/empty_set_operator.h
+++ b/be/src/pipeline/exec/empty_set_operator.h
@@ -33,7 +33,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class EmptySetSourceOperator final : public Operator<EmptySetSourceOperatorBuilder> {
+class EmptySetSourceOperator final : public SourceOperator<EmptySetSourceOperatorBuilder> {
 public:
     EmptySetSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* empty_set_node);
     bool can_read() override { return true; };
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp
index 31bed810cb..88df3c57c7 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -23,7 +23,7 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(ExchangeSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(ExchangeSourceOperator, SourceOperator)
 
 bool ExchangeSourceOperator::can_read() {
     return _node->_stream_recvr->ready_to_read();
diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h
index 273d6664c5..b6143a86ab 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -34,7 +34,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class ExchangeSourceOperator final : public Operator<ExchangeSourceOperatorBuilder> {
+class ExchangeSourceOperator final : public SourceOperator<ExchangeSourceOperatorBuilder> {
 public:
     ExchangeSourceOperator(OperatorBuilderBase*, ExecNode*);
     bool can_read() override;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 6ac084fce5..73fdc69252 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -21,6 +21,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(HashJoinBuildSink, Operator)
+OPERATOR_CODE_GENERATOR(HashJoinBuildSink, StreamingOperator)
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h
index aa7cbf22bb..404b8e53ad 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -36,7 +36,7 @@ public:
     bool is_sink() const override { return true; };
 };
 
-class HashJoinBuildSink final : public Operator<HashJoinBuildSinkBuilder> {
+class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilder> {
 public:
     HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
     bool can_write() override { return true; };
diff --git a/be/src/pipeline/exec/mysql_scan_operator.cpp b/be/src/pipeline/exec/mysql_scan_operator.cpp
index 575d272c33..e62502ae17 100644
--- a/be/src/pipeline/exec/mysql_scan_operator.cpp
+++ b/be/src/pipeline/exec/mysql_scan_operator.cpp
@@ -21,16 +21,16 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(MysqlScanOperator, Operator)
+OPERATOR_CODE_GENERATOR(MysqlScanOperator, SourceOperator)
 
 Status MysqlScanOperator::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(Operator::open(state));
+    RETURN_IF_ERROR(SourceOperator::open(state));
     return _node->open(state);
 }
 
 Status MysqlScanOperator::close(RuntimeState* state) {
-    RETURN_IF_ERROR(Operator::close(state));
+    RETURN_IF_ERROR(SourceOperator::close(state));
     _node->close(state);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/mysql_scan_operator.h b/be/src/pipeline/exec/mysql_scan_operator.h
index 798b698691..6ff5b5217e 100644
--- a/be/src/pipeline/exec/mysql_scan_operator.h
+++ b/be/src/pipeline/exec/mysql_scan_operator.h
@@ -29,7 +29,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class MysqlScanOperator : public Operator<MysqlScanOperatorBuilder> {
+class MysqlScanOperator : public SourceOperator<MysqlScanOperatorBuilder> {
 public:
     MysqlScanOperator(OperatorBuilderBase* operator_builder, ExecNode* mysql_scan_node);
 
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 2467b9a8b9..5e3245b983 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -21,6 +21,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(NestLoopJoinBuildOperator, Operator)
+OPERATOR_CODE_GENERATOR(NestLoopJoinBuildOperator, StreamingOperator)
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 14d4f7108d..e5c9f3f499 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -37,7 +37,7 @@ public:
     bool is_sink() const override { return true; };
 };
 
-class NestLoopJoinBuildOperator final : public Operator<NestLoopJoinBuildOperatorBuilder> {
+class NestLoopJoinBuildOperator final : public StreamingOperator<NestLoopJoinBuildOperatorBuilder> {
 public:
     NestLoopJoinBuildOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
     bool can_write() override { return true; };
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a17e8dd1f4..68870fc57b 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -24,6 +24,8 @@
 #include "runtime/runtime_state.h"
 #include "vec/core/block.h"
 #include "vec/exec/vdata_gen_scan_node.h"
+#include "vec/exec/vselect_node.h"
+#include "vec/exec/vunion_node.h"
 
 #define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS)                                                 \
     NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node)                               \
@@ -143,6 +145,8 @@ public:
     explicit OperatorBase(OperatorBuilderBase* operator_builder);
     virtual ~OperatorBase() = default;
 
+    virtual std::string get_name() const = 0;
+
     bool is_sink() const;
 
     bool is_source() const;
@@ -254,11 +258,13 @@ public:
 
     ~DataSinkOperator() override = default;
 
+    std::string get_name() const override { return "DataSinkOperator"; }
+
     Status prepare(RuntimeState* state) override {
         RETURN_IF_ERROR(_sink->prepare(state));
         _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
         _sink->profile()->insert_child_head(_runtime_profile.get(), true);
-        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+        _mem_tracker = std::make_unique<MemTracker>("DataSinkOperator:" + _runtime_profile->name(),
                                                     _runtime_profile.get());
         return Status::OK();
     }
@@ -297,20 +303,22 @@ protected:
  * All operators inherited from Operator will hold a ExecNode inside.
  */
 template <typename OperatorBuilderType>
-class Operator : public OperatorBase {
+class StreamingOperator : public OperatorBase {
 public:
     using NodeType =
             std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
-    Operator(OperatorBuilderBase* builder, ExecNode* node)
+    StreamingOperator(OperatorBuilderBase* builder, ExecNode* node)
             : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
 
-    ~Operator() override = default;
+    ~StreamingOperator() override = default;
+
+    std::string get_name() const override { return "StreamingOperator"; }
 
     Status prepare(RuntimeState* state) override {
         _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
         _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
-        _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+        _mem_tracker = std::make_unique<MemTracker>(get_name() + ": " + _runtime_profile->name(),
                                                     _runtime_profile.get());
         _node->increase_ref();
         return Status::OK();
@@ -339,9 +347,10 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override {
         SCOPED_TIMER(_runtime_profile->total_time_counter());
+        DCHECK(_child);
+        RETURN_IF_ERROR(_child->get_block(state, block, source_state));
         bool eos = false;
         RETURN_IF_ERROR(_node->pull(state, block, &eos));
-        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
         return Status::OK();
     }
 
@@ -358,6 +367,29 @@ protected:
     NodeType* _node;
 };
 
+template <typename OperatorBuilderType>
+class SourceOperator : public StreamingOperator<OperatorBuilderType> {
+public:
+    using NodeType =
+            std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+    SourceOperator(OperatorBuilderBase* builder, ExecNode* node)
+            : StreamingOperator<OperatorBuilderType>(builder, node) {};
+
+    ~SourceOperator() override = default;
+
+    std::string get_name() const override { return "SourceOperator"; }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override {
+        auto& node = StreamingOperator<OperatorBuilderType>::_node;
+        bool eos = false;
+        RETURN_IF_ERROR(node->pull(state, block, &eos));
+        source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+        return Status::OK();
+    }
+};
+
 /**
  * StatefulOperator indicates the operators with some states inside.
  *
@@ -367,22 +399,24 @@ protected:
  * In a nutshell, it is a one-to-many relation between input blocks and output blocks for StatefulOperator.
  */
 template <typename OperatorBuilderType>
-class StatefulOperator : public Operator<OperatorBuilderType> {
+class StatefulOperator : public StreamingOperator<OperatorBuilderType> {
 public:
     using NodeType =
             std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
 
     StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
-            : Operator<OperatorBuilderType>(builder, node),
+            : StreamingOperator<OperatorBuilderType>(builder, node),
               _child_block(new vectorized::Block),
               _child_source_state(SourceState::DEPEND_ON_SOURCE) {};
 
     virtual ~StatefulOperator() = default;
 
+    std::string get_name() const override { return "DataStateOperator"; }
+
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override {
-        auto& node = Operator<OperatorBuilderType>::_node;
-        auto& child = Operator<OperatorBuilderType>::_child;
+        auto& node = StreamingOperator<OperatorBuilderType>::_node;
+        auto& child = StreamingOperator<OperatorBuilderType>::_child;
 
         if (node->need_more_input_data()) {
             RETURN_IF_ERROR(child->get_block(state, _child_block.get(), _child_source_state));
diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp
index bbf63954b7..f28c7284fe 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -22,11 +22,11 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(ScanOperator, Operator)
+OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
 
 Status ScanOperator::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(Operator::open(state));
+    RETURN_IF_ERROR(SourceOperator::open(state));
     return _node->open(state);
 }
 
@@ -48,7 +48,7 @@ bool ScanOperator::is_pending_finish() const {
 }
 
 Status ScanOperator::close(RuntimeState* state) {
-    RETURN_IF_ERROR(Operator::close(state));
+    RETURN_IF_ERROR(SourceOperator::close(state));
     _node->close(state);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h
index 14292b141b..964625fa7d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -34,7 +34,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class ScanOperator : public Operator<ScanOperatorBuilder> {
+class ScanOperator : public SourceOperator<ScanOperatorBuilder> {
 public:
     ScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node);
 
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp
index 7b887b084d..497d425bb8 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -21,7 +21,7 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(SchemaScanOperator, Operator)
+OPERATOR_CODE_GENERATOR(SchemaScanOperator, SourceOperator)
 
 Status SchemaScanOperator::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
@@ -29,7 +29,7 @@ Status SchemaScanOperator::open(RuntimeState* state) {
 }
 
 Status SchemaScanOperator::close(RuntimeState* state) {
-    RETURN_IF_ERROR(Operator::close(state));
+    RETURN_IF_ERROR(SourceOperator::close(state));
     _node->close(state);
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h
index 9684bde68d..34b56e3b13 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -32,7 +32,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class SchemaScanOperator : public Operator<SchemaScanOperatorBuilder> {
+class SchemaScanOperator : public SourceOperator<SchemaScanOperatorBuilder> {
 public:
     SchemaScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node);
 
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/select_operator.cpp
similarity index 87%
copy from be/src/pipeline/exec/analytic_sink_operator.cpp
copy to be/src/pipeline/exec/select_operator.cpp
index 0e06e517d9..c629c29978 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/select_operator.cpp
@@ -1,4 +1,3 @@
-
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -16,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "analytic_sink_operator.h"
+#include "select_operator.h"
+
+#include "vec/exec/vselect_node.h"
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, Operator)
+OPERATOR_CODE_GENERATOR(SelectOperator, StreamingOperator)
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/select_operator.h
similarity index 70%
copy from be/src/pipeline/exec/hashjoin_build_sink.h
copy to be/src/pipeline/exec/select_operator.h
index aa7cbf22bb..d71cb319cf 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/select_operator.h
@@ -21,26 +21,23 @@
 
 namespace doris {
 namespace vectorized {
-class HashJoinNode;
+class VSelectNode;
 class VExprContext;
 class Block;
 } // namespace vectorized
-
 namespace pipeline {
 
-class HashJoinBuildSinkBuilder final : public OperatorBuilder<vectorized::HashJoinNode> {
+class SelectOperatorBuilder final : public OperatorBuilder<vectorized::VSelectNode> {
 public:
-    HashJoinBuildSinkBuilder(int32_t, ExecNode*);
+    SelectOperatorBuilder(int32_t id, ExecNode* select_node);
 
     OperatorPtr build_operator() override;
-    bool is_sink() const override { return true; };
 };
 
-class HashJoinBuildSink final : public Operator<HashJoinBuildSinkBuilder> {
+class SelectOperator final : public StreamingOperator<SelectOperatorBuilder> {
 public:
-    HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
-    bool can_write() override { return true; };
+    SelectOperator(OperatorBuilderBase* operator_builder, ExecNode* select_node);
 };
 
 } // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 2f51edd12a..705161725b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -35,7 +35,7 @@ OperatorPtr SetProbeSinkOperatorBuilder<is_intersect>::build_operator() {
 template <bool is_intersect>
 SetProbeSinkOperator<is_intersect>::SetProbeSinkOperator(OperatorBuilderBase* operator_builder,
                                                          int child_id, ExecNode* set_node)
-        : Operator<SetProbeSinkOperatorBuilder<is_intersect>>(operator_builder, set_node),
+        : StreamingOperator<SetProbeSinkOperatorBuilder<is_intersect>>(operator_builder, set_node),
           _child_id(child_id) {}
 
 template <bool is_intersect>
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h
index fab23dd8c1..2fbed5e990 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -46,7 +46,7 @@ private:
 };
 
 template <bool is_intersect>
-class SetProbeSinkOperator : public Operator<SetProbeSinkOperatorBuilder<is_intersect>> {
+class SetProbeSinkOperator : public StreamingOperator<SetProbeSinkOperatorBuilder<is_intersect>> {
 public:
     SetProbeSinkOperator(OperatorBuilderBase* operator_builder, int child_id, ExecNode* set_node);
 
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp
index aaa85c31f1..ea85b4f54e 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -34,7 +34,7 @@ OperatorPtr SetSinkOperatorBuilder<is_intersect>::build_operator() {
 template <bool is_intersect>
 SetSinkOperator<is_intersect>::SetSinkOperator(
         OperatorBuilderBase* builder, vectorized::VSetOperationNode<is_intersect>* set_node)
-        : Operator<SetSinkOperatorBuilder<is_intersect>>(builder, set_node) {}
+        : StreamingOperator<SetSinkOperatorBuilder<is_intersect>>(builder, set_node) {}
 
 template class SetSinkOperatorBuilder<true>;
 template class SetSinkOperatorBuilder<false>;
diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h
index 2f45b04e35..d729666ee9 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -43,7 +43,7 @@ public:
 };
 
 template <bool is_intersect>
-class SetSinkOperator : public Operator<SetSinkOperatorBuilder<is_intersect>> {
+class SetSinkOperator : public StreamingOperator<SetSinkOperatorBuilder<is_intersect>> {
 public:
     SetSinkOperator(OperatorBuilderBase* operator_builder,
                     vectorized::VSetOperationNode<is_intersect>* set_node);
diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp
index bb14936e83..078df0476e 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -34,7 +34,7 @@ OperatorPtr SetSourceOperatorBuilder<is_intersect>::build_operator() {
 template <bool is_intersect>
 SetSourceOperator<is_intersect>::SetSourceOperator(
         OperatorBuilderBase* builder, vectorized::VSetOperationNode<is_intersect>* set_node)
-        : Operator<SetSourceOperatorBuilder<is_intersect>>(builder, set_node) {}
+        : SourceOperator<SetSourceOperatorBuilder<is_intersect>>(builder, set_node) {}
 
 template class SetSourceOperatorBuilder<true>;
 template class SetSourceOperatorBuilder<false>;
diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h
index f8d3969fa6..4a90162759 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -43,7 +43,7 @@ public:
 };
 
 template <bool is_intersect>
-class SetSourceOperator : public Operator<SetSourceOperatorBuilder<is_intersect>> {
+class SetSourceOperator : public SourceOperator<SetSourceOperatorBuilder<is_intersect>> {
 public:
     SetSourceOperator(OperatorBuilderBase* builder,
                       vectorized::VSetOperationNode<is_intersect>* set_node);
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp
index 6f65718b3a..cdcd75aaaa 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -21,6 +21,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(SortSinkOperator, Operator)
+OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator)
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h
index 6574486879..aa317e12e9 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -36,7 +36,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class SortSinkOperator final : public Operator<SortSinkOperatorBuilder> {
+class SortSinkOperator final : public StreamingOperator<SortSinkOperatorBuilder> {
 public:
     SortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node);
 
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp
index e0be159263..a2b859d949 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -21,6 +21,6 @@
 
 namespace doris::pipeline {
 
-OPERATOR_CODE_GENERATOR(SortSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(SortSourceOperator, SourceOperator)
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h
index f30f4a725c..017e91f60c 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -36,7 +36,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class SortSourceOperator final : public Operator<SortSourceOperatorBuilder> {
+class SortSourceOperator final : public SourceOperator<SortSourceOperatorBuilder> {
 public:
     SortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node);
     Status open(RuntimeState*) override { return Status::OK(); }
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
index 2a6c321db8..d4049476ac 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
@@ -24,10 +24,10 @@ namespace doris::pipeline {
 StreamingAggSinkOperator::StreamingAggSinkOperator(OperatorBuilderBase* operator_builder,
                                                    ExecNode* agg_node,
                                                    std::shared_ptr<AggContext> agg_context)
-        : Operator(operator_builder, agg_node), _agg_context(std::move(agg_context)) {}
+        : StreamingOperator(operator_builder, agg_node), _agg_context(std::move(agg_context)) {}
 
 Status StreamingAggSinkOperator::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(Operator::prepare(state));
+    RETURN_IF_ERROR(StreamingOperator::prepare(state));
     _queue_byte_size_counter =
             ADD_COUNTER(_runtime_profile.get(), "MaxSizeInBlockQueue", TUnit::BYTES);
     _queue_size_counter = ADD_COUNTER(_runtime_profile.get(), "MaxSizeOfBlockQueue", TUnit::UNIT);
@@ -60,14 +60,14 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in
 }
 
 Status StreamingAggSinkOperator::close(RuntimeState* state) {
-    Operator::close(state);
     if (_agg_context && !_agg_context->is_finish()) {
         // finish should be set, if not set here means error.
         _agg_context->set_canceled();
     }
     COUNTER_SET(_queue_size_counter, _agg_context->max_size_of_queue());
     COUNTER_SET(_queue_byte_size_counter, _agg_context->max_bytes_in_queue());
-    return Status::OK();
+    return StreamingOperator::close(state);
+    ;
 }
 
 StreamingAggSinkOperatorBuilder::StreamingAggSinkOperatorBuilder(
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index fe3edc7b52..26e080c2a3 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -42,7 +42,7 @@ private:
     std::shared_ptr<AggContext> _agg_context;
 };
 
-class StreamingAggSinkOperator final : public Operator<StreamingAggSinkOperatorBuilder> {
+class StreamingAggSinkOperator final : public StreamingOperator<StreamingAggSinkOperatorBuilder> {
 public:
     StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*,
                              std::shared_ptr<AggContext>);
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index e246876a0c..5451ae86b9 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -23,7 +23,7 @@ namespace doris {
 namespace pipeline {
 StreamingAggSourceOperator::StreamingAggSourceOperator(OperatorBuilderBase* templ, ExecNode* node,
                                                        std::shared_ptr<AggContext> agg_context)
-        : Operator(templ, node), _agg_context(std::move(agg_context)) {}
+        : SourceOperator(templ, node), _agg_context(std::move(agg_context)) {}
 
 bool StreamingAggSourceOperator::can_read() {
     return _agg_context->has_data_or_finished();
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
index 11ec3a6725..fba6fab2a4 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
@@ -38,7 +38,7 @@ private:
     std::shared_ptr<AggContext> _agg_context;
 };
 
-class StreamingAggSourceOperator final : public Operator<StreamingAggSourceOperatorBuilder> {
+class StreamingAggSourceOperator final : public SourceOperator<StreamingAggSourceOperatorBuilder> {
 public:
     StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr<AggContext>);
     bool can_read() override;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 0a6a76e3a9..f88fd6b931 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -38,6 +38,7 @@
 #include "exec/scan_node.h"
 #include "exec/scan_operator.h"
 #include "exec/schema_scan_operator.h"
+#include "exec/select_operator.h"
 #include "exec/set_probe_sink_operator.h"
 #include "exec/set_sink_operator.h"
 #include "exec/set_source_operator.h"
@@ -459,6 +460,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(node, cur_pipe));
         break;
     }
+    case TPlanNodeType::SELECT_NODE: {
+        RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
+        OperatorBuilderPtr builder =
+                std::make_shared<SelectOperatorBuilder>(next_operator_builder_id(), node);
+        RETURN_IF_ERROR(cur_pipe->add_operator(builder));
+        break;
+    }
     default:
         return Status::InternalError("Unsupported exec type in pipeline: {}",
                                      print_plan_node_type(node_type));
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index 8f561f10a0..c8060d5e41 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -57,8 +57,16 @@ Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool
         }
     } while (block->rows() == 0);
 
-    RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns()));
-    reached_limit(block, eos);
+    return pull(state, block, eos);
+}
+
+Status VSelectNode::pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSelectNode::pull");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(
+            VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns()));
+    reached_limit(output_block, eos);
 
     return Status::OK();
 }
diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h
index 9cabb82189..afff5cb734 100644
--- a/be/src/vec/exec/vselect_node.h
+++ b/be/src/vec/exec/vselect_node.h
@@ -21,15 +21,16 @@
 namespace doris {
 namespace vectorized {
 
-class VSelectNode : public ExecNode {
+class VSelectNode final : public ExecNode {
 public:
     VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-    virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
-    virtual Status prepare(RuntimeState* state);
-    virtual Status open(RuntimeState* state);
-    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
-    virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
-    virtual Status close(RuntimeState* state);
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+    Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+    Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
+    Status close(RuntimeState* state) override;
+    Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
 
 private:
     // true if last get_next() call on child signalled eos


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org