You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/11 13:31:40 UTC
[doris] branch master updated: [Pipeline](select node) Support select node on pipeline engine (#14928)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7fb695b51d [Pipeline](select node) Support select node on pipeline engine (#14928)
7fb695b51d is described below
commit 7fb695b51d0c21bf9265b9458736a304286f1765
Author: Gabriel <ga...@gmail.com>
AuthorDate: Sun Dec 11 21:31:32 2022 +0800
[Pipeline](select node) Support select node on pipeline engine (#14928)
---
be/src/pipeline/CMakeLists.txt | 3 +-
be/src/pipeline/exec/aggregation_sink_operator.cpp | 2 +-
be/src/pipeline/exec/aggregation_sink_operator.h | 2 +-
.../pipeline/exec/aggregation_source_operator.cpp | 2 +-
be/src/pipeline/exec/aggregation_source_operator.h | 2 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +-
be/src/pipeline/exec/analytic_sink_operator.h | 2 +-
be/src/pipeline/exec/analytic_source_operator.cpp | 2 +-
be/src/pipeline/exec/analytic_source_operator.h | 2 +-
be/src/pipeline/exec/assert_num_rows_operator.h | 4 +-
be/src/pipeline/exec/const_value_operator.h | 4 +-
be/src/pipeline/exec/datagen_operator.cpp | 6 +--
be/src/pipeline/exec/datagen_operator.h | 2 +-
be/src/pipeline/exec/empty_set_operator.cpp | 2 +-
be/src/pipeline/exec/empty_set_operator.h | 2 +-
be/src/pipeline/exec/exchange_source_operator.cpp | 2 +-
be/src/pipeline/exec/exchange_source_operator.h | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 2 +-
be/src/pipeline/exec/mysql_scan_operator.cpp | 6 +--
be/src/pipeline/exec/mysql_scan_operator.h | 2 +-
.../exec/nested_loop_join_build_operator.cpp | 2 +-
.../exec/nested_loop_join_build_operator.h | 2 +-
be/src/pipeline/exec/operator.h | 54 ++++++++++++++++++----
be/src/pipeline/exec/scan_operator.cpp | 6 +--
be/src/pipeline/exec/scan_operator.h | 2 +-
be/src/pipeline/exec/schema_scan_operator.cpp | 4 +-
be/src/pipeline/exec/schema_scan_operator.h | 2 +-
...lytic_sink_operator.cpp => select_operator.cpp} | 7 +--
.../{hashjoin_build_sink.h => select_operator.h} | 15 +++---
be/src/pipeline/exec/set_probe_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_probe_sink_operator.h | 2 +-
be/src/pipeline/exec/set_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_sink_operator.h | 2 +-
be/src/pipeline/exec/set_source_operator.cpp | 2 +-
be/src/pipeline/exec/set_source_operator.h | 2 +-
be/src/pipeline/exec/sort_sink_operator.cpp | 2 +-
be/src/pipeline/exec/sort_sink_operator.h | 2 +-
be/src/pipeline/exec/sort_source_operator.cpp | 2 +-
be/src/pipeline/exec/sort_source_operator.h | 2 +-
.../exec/streaming_aggregation_sink_operator.cpp | 8 ++--
.../exec/streaming_aggregation_sink_operator.h | 2 +-
.../exec/streaming_aggregation_source_operator.cpp | 2 +-
.../exec/streaming_aggregation_source_operator.h | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 8 ++++
be/src/vec/exec/vselect_node.cpp | 12 ++++-
be/src/vec/exec/vselect_node.h | 15 +++---
47 files changed, 134 insertions(+), 84 deletions(-)
diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt
index bca5816c2b..31531fcbc5 100644
--- a/be/src/pipeline/CMakeLists.txt
+++ b/be/src/pipeline/CMakeLists.txt
@@ -53,7 +53,8 @@ set(PIPELINE_FILES
exec/nested_loop_join_probe_operator.cpp
exec/set_sink_operator.cpp
exec/set_source_operator.cpp
- exec/set_probe_sink_operator.cpp)
+ exec/set_probe_sink_operator.cpp
+ exec/select_operator.cpp)
add_library(Pipeline STATIC
${PIPELINE_FILES}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index ae1205ce1a..ce53e4d6a4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -21,6 +21,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(AggSinkOperator, Operator)
+OPERATOR_CODE_GENERATOR(AggSinkOperator, StreamingOperator)
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h
index 21d9ad7d20..7b5c4ee2f6 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -37,7 +37,7 @@ public:
bool is_sink() const override { return true; };
};
-class AggSinkOperator final : public Operator<AggSinkOperatorBuilder> {
+class AggSinkOperator final : public StreamingOperator<AggSinkOperatorBuilder> {
public:
AggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return true; };
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp
index daecafdd0e..e3b2aa1f23 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -22,7 +22,7 @@
namespace doris {
namespace pipeline {
-OPERATOR_CODE_GENERATOR(AggSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(AggSourceOperator, SourceOperator)
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h
index bd3cbd9cd4..04c28abd40 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -34,7 +34,7 @@ public:
OperatorPtr build_operator() override;
};
-class AggSourceOperator final : public Operator<AggSourceOperatorBuilder> {
+class AggSourceOperator final : public SourceOperator<AggSourceOperatorBuilder> {
public:
AggSourceOperator(OperatorBuilderBase*, ExecNode*);
// if exec node split to: sink, source operator. the source operator
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 0e06e517d9..4a17359ea7 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -20,6 +20,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, Operator)
+OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator)
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h
index b330b37637..c786ffb9d7 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -36,7 +36,7 @@ public:
bool is_sink() const override { return true; };
};
-class AnalyticSinkOperator final : public Operator<AnalyticSinkOperatorBuilder> {
+class AnalyticSinkOperator final : public StreamingOperator<AnalyticSinkOperatorBuilder> {
public:
AnalyticSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp
index 7f0194a62f..9c9717d24a 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -19,6 +19,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(AnalyticSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(AnalyticSourceOperator, SourceOperator)
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h
index 0d25bb850b..34fd9cdfde 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -37,7 +37,7 @@ public:
OperatorPtr build_operator() override;
};
-class AnalyticSourceOperator final : public Operator<AnalyticSourceOperatorBuilder> {
+class AnalyticSourceOperator final : public SourceOperator<AnalyticSourceOperatorBuilder> {
public:
AnalyticSourceOperator(OperatorBuilderBase*, ExecNode*);
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h
index 60bbc7c49c..40acfd05f8 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -32,10 +32,10 @@ public:
OperatorPtr build_operator() override;
};
-class AssertNumRowsOperator final : public Operator<AssertNumRowsOperatorBuilder> {
+class AssertNumRowsOperator final : public StreamingOperator<AssertNumRowsOperatorBuilder> {
public:
AssertNumRowsOperator(OperatorBuilderBase* operator_builder, ExecNode* node)
- : Operator(operator_builder, node) {};
+ : StreamingOperator(operator_builder, node) {};
};
OperatorPtr AssertNumRowsOperatorBuilder::build_operator() {
diff --git a/be/src/pipeline/exec/const_value_operator.h b/be/src/pipeline/exec/const_value_operator.h
index 838005d2f1..467f5ded05 100644
--- a/be/src/pipeline/exec/const_value_operator.h
+++ b/be/src/pipeline/exec/const_value_operator.h
@@ -36,10 +36,10 @@ public:
bool is_source() const override { return true; }
};
-class ConstValueOperator final : public Operator<ConstValueOperatorBuilder> {
+class ConstValueOperator final : public SourceOperator<ConstValueOperatorBuilder> {
public:
ConstValueOperator(OperatorBuilderBase* operator_builder, ExecNode* node)
- : Operator(operator_builder, node) {};
+ : SourceOperator(operator_builder, node) {};
bool can_read() override { return true; };
};
diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp
index 729096f803..fc4a32342c 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -21,16 +21,16 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(DataGenOperator, Operator)
+OPERATOR_CODE_GENERATOR(DataGenOperator, SourceOperator)
Status DataGenOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(Operator::open(state));
+ RETURN_IF_ERROR(SourceOperator::open(state));
return _node->open(state);
}
Status DataGenOperator::close(RuntimeState* state) {
- RETURN_IF_ERROR(Operator::close(state));
+ RETURN_IF_ERROR(SourceOperator::close(state));
_node->close(state);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h
index 799fa3c776..27933caf26 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -32,7 +32,7 @@ public:
OperatorPtr build_operator() override;
};
-class DataGenOperator : public Operator<DataGenOperatorBuilder> {
+class DataGenOperator : public SourceOperator<DataGenOperatorBuilder> {
public:
DataGenOperator(OperatorBuilderBase* operator_builder, ExecNode* datagen_node);
diff --git a/be/src/pipeline/exec/empty_set_operator.cpp b/be/src/pipeline/exec/empty_set_operator.cpp
index 2f607080d5..be1178e9a6 100644
--- a/be/src/pipeline/exec/empty_set_operator.cpp
+++ b/be/src/pipeline/exec/empty_set_operator.cpp
@@ -21,6 +21,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(EmptySetSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(EmptySetSourceOperator, SourceOperator)
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/empty_set_operator.h b/be/src/pipeline/exec/empty_set_operator.h
index 8cffcd2783..f0dc39e1fd 100644
--- a/be/src/pipeline/exec/empty_set_operator.h
+++ b/be/src/pipeline/exec/empty_set_operator.h
@@ -33,7 +33,7 @@ public:
OperatorPtr build_operator() override;
};
-class EmptySetSourceOperator final : public Operator<EmptySetSourceOperatorBuilder> {
+class EmptySetSourceOperator final : public SourceOperator<EmptySetSourceOperatorBuilder> {
public:
EmptySetSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* empty_set_node);
bool can_read() override { return true; };
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp
index 31bed810cb..88df3c57c7 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -23,7 +23,7 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(ExchangeSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(ExchangeSourceOperator, SourceOperator)
bool ExchangeSourceOperator::can_read() {
return _node->_stream_recvr->ready_to_read();
diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h
index 273d6664c5..b6143a86ab 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -34,7 +34,7 @@ public:
OperatorPtr build_operator() override;
};
-class ExchangeSourceOperator final : public Operator<ExchangeSourceOperatorBuilder> {
+class ExchangeSourceOperator final : public SourceOperator<ExchangeSourceOperatorBuilder> {
public:
ExchangeSourceOperator(OperatorBuilderBase*, ExecNode*);
bool can_read() override;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 6ac084fce5..73fdc69252 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -21,6 +21,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(HashJoinBuildSink, Operator)
+OPERATOR_CODE_GENERATOR(HashJoinBuildSink, StreamingOperator)
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h
index aa7cbf22bb..404b8e53ad 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -36,7 +36,7 @@ public:
bool is_sink() const override { return true; };
};
-class HashJoinBuildSink final : public Operator<HashJoinBuildSinkBuilder> {
+class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilder> {
public:
HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return true; };
diff --git a/be/src/pipeline/exec/mysql_scan_operator.cpp b/be/src/pipeline/exec/mysql_scan_operator.cpp
index 575d272c33..e62502ae17 100644
--- a/be/src/pipeline/exec/mysql_scan_operator.cpp
+++ b/be/src/pipeline/exec/mysql_scan_operator.cpp
@@ -21,16 +21,16 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(MysqlScanOperator, Operator)
+OPERATOR_CODE_GENERATOR(MysqlScanOperator, SourceOperator)
Status MysqlScanOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(Operator::open(state));
+ RETURN_IF_ERROR(SourceOperator::open(state));
return _node->open(state);
}
Status MysqlScanOperator::close(RuntimeState* state) {
- RETURN_IF_ERROR(Operator::close(state));
+ RETURN_IF_ERROR(SourceOperator::close(state));
_node->close(state);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/mysql_scan_operator.h b/be/src/pipeline/exec/mysql_scan_operator.h
index 798b698691..6ff5b5217e 100644
--- a/be/src/pipeline/exec/mysql_scan_operator.h
+++ b/be/src/pipeline/exec/mysql_scan_operator.h
@@ -29,7 +29,7 @@ public:
OperatorPtr build_operator() override;
};
-class MysqlScanOperator : public Operator<MysqlScanOperatorBuilder> {
+class MysqlScanOperator : public SourceOperator<MysqlScanOperatorBuilder> {
public:
MysqlScanOperator(OperatorBuilderBase* operator_builder, ExecNode* mysql_scan_node);
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 2467b9a8b9..5e3245b983 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -21,6 +21,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(NestLoopJoinBuildOperator, Operator)
+OPERATOR_CODE_GENERATOR(NestLoopJoinBuildOperator, StreamingOperator)
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 14d4f7108d..e5c9f3f499 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -37,7 +37,7 @@ public:
bool is_sink() const override { return true; };
};
-class NestLoopJoinBuildOperator final : public Operator<NestLoopJoinBuildOperatorBuilder> {
+class NestLoopJoinBuildOperator final : public StreamingOperator<NestLoopJoinBuildOperatorBuilder> {
public:
NestLoopJoinBuildOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return true; };
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a17e8dd1f4..68870fc57b 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -24,6 +24,8 @@
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
#include "vec/exec/vdata_gen_scan_node.h"
+#include "vec/exec/vselect_node.h"
+#include "vec/exec/vunion_node.h"
#define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \
NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \
@@ -143,6 +145,8 @@ public:
explicit OperatorBase(OperatorBuilderBase* operator_builder);
virtual ~OperatorBase() = default;
+ virtual std::string get_name() const = 0;
+
bool is_sink() const;
bool is_source() const;
@@ -254,11 +258,13 @@ public:
~DataSinkOperator() override = default;
+ std::string get_name() const override { return "DataSinkOperator"; }
+
Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(_sink->prepare(state));
_runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
_sink->profile()->insert_child_head(_runtime_profile.get(), true);
- _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+ _mem_tracker = std::make_unique<MemTracker>("DataSinkOperator:" + _runtime_profile->name(),
_runtime_profile.get());
return Status::OK();
}
@@ -297,20 +303,22 @@ protected:
* All operators inherited from Operator will hold a ExecNode inside.
*/
template <typename OperatorBuilderType>
-class Operator : public OperatorBase {
+class StreamingOperator : public OperatorBase {
public:
using NodeType =
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
- Operator(OperatorBuilderBase* builder, ExecNode* node)
+ StreamingOperator(OperatorBuilderBase* builder, ExecNode* node)
: OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
- ~Operator() override = default;
+ ~StreamingOperator() override = default;
+
+ std::string get_name() const override { return "StreamingOperator"; }
Status prepare(RuntimeState* state) override {
_runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
_node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
- _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
+ _mem_tracker = std::make_unique<MemTracker>(get_name() + ": " + _runtime_profile->name(),
_runtime_profile.get());
_node->increase_ref();
return Status::OK();
@@ -339,9 +347,10 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ DCHECK(_child);
+ RETURN_IF_ERROR(_child->get_block(state, block, source_state));
bool eos = false;
RETURN_IF_ERROR(_node->pull(state, block, &eos));
- source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
return Status::OK();
}
@@ -358,6 +367,29 @@ protected:
NodeType* _node;
};
+template <typename OperatorBuilderType>
+class SourceOperator : public StreamingOperator<OperatorBuilderType> {
+public:
+ using NodeType =
+ std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
+
+ SourceOperator(OperatorBuilderBase* builder, ExecNode* node)
+ : StreamingOperator<OperatorBuilderType>(builder, node) {};
+
+ ~SourceOperator() override = default;
+
+ std::string get_name() const override { return "SourceOperator"; }
+
+ Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state) override {
+ auto& node = StreamingOperator<OperatorBuilderType>::_node;
+ bool eos = false;
+ RETURN_IF_ERROR(node->pull(state, block, &eos));
+ source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
+ return Status::OK();
+ }
+};
+
/**
* StatefulOperator indicates the operators with some states inside.
*
@@ -367,22 +399,24 @@ protected:
* In a nutshell, it is a one-to-many relation between input blocks and output blocks for StatefulOperator.
*/
template <typename OperatorBuilderType>
-class StatefulOperator : public Operator<OperatorBuilderType> {
+class StatefulOperator : public StreamingOperator<OperatorBuilderType> {
public:
using NodeType =
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
- : Operator<OperatorBuilderType>(builder, node),
+ : StreamingOperator<OperatorBuilderType>(builder, node),
_child_block(new vectorized::Block),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {};
virtual ~StatefulOperator() = default;
+ std::string get_name() const override { return "DataStateOperator"; }
+
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
- auto& node = Operator<OperatorBuilderType>::_node;
- auto& child = Operator<OperatorBuilderType>::_child;
+ auto& node = StreamingOperator<OperatorBuilderType>::_node;
+ auto& child = StreamingOperator<OperatorBuilderType>::_child;
if (node->need_more_input_data()) {
RETURN_IF_ERROR(child->get_block(state, _child_block.get(), _child_source_state));
diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp
index bbf63954b7..f28c7284fe 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -22,11 +22,11 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(ScanOperator, Operator)
+OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
Status ScanOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(Operator::open(state));
+ RETURN_IF_ERROR(SourceOperator::open(state));
return _node->open(state);
}
@@ -48,7 +48,7 @@ bool ScanOperator::is_pending_finish() const {
}
Status ScanOperator::close(RuntimeState* state) {
- RETURN_IF_ERROR(Operator::close(state));
+ RETURN_IF_ERROR(SourceOperator::close(state));
_node->close(state);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h
index 14292b141b..964625fa7d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -34,7 +34,7 @@ public:
OperatorPtr build_operator() override;
};
-class ScanOperator : public Operator<ScanOperatorBuilder> {
+class ScanOperator : public SourceOperator<ScanOperatorBuilder> {
public:
ScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node);
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp
index 7b887b084d..497d425bb8 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -21,7 +21,7 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(SchemaScanOperator, Operator)
+OPERATOR_CODE_GENERATOR(SchemaScanOperator, SourceOperator)
Status SchemaScanOperator::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
@@ -29,7 +29,7 @@ Status SchemaScanOperator::open(RuntimeState* state) {
}
Status SchemaScanOperator::close(RuntimeState* state) {
- RETURN_IF_ERROR(Operator::close(state));
+ RETURN_IF_ERROR(SourceOperator::close(state));
_node->close(state);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h
index 9684bde68d..34b56e3b13 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -32,7 +32,7 @@ public:
OperatorPtr build_operator() override;
};
-class SchemaScanOperator : public Operator<SchemaScanOperatorBuilder> {
+class SchemaScanOperator : public SourceOperator<SchemaScanOperatorBuilder> {
public:
SchemaScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/select_operator.cpp
similarity index 87%
copy from be/src/pipeline/exec/analytic_sink_operator.cpp
copy to be/src/pipeline/exec/select_operator.cpp
index 0e06e517d9..c629c29978 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/select_operator.cpp
@@ -1,4 +1,3 @@
-
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -16,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-#include "analytic_sink_operator.h"
+#include "select_operator.h"
+
+#include "vec/exec/vselect_node.h"
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, Operator)
+OPERATOR_CODE_GENERATOR(SelectOperator, StreamingOperator)
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/select_operator.h
similarity index 70%
copy from be/src/pipeline/exec/hashjoin_build_sink.h
copy to be/src/pipeline/exec/select_operator.h
index aa7cbf22bb..d71cb319cf 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/select_operator.h
@@ -21,26 +21,23 @@
namespace doris {
namespace vectorized {
-class HashJoinNode;
+class VSelectNode;
class VExprContext;
class Block;
} // namespace vectorized
-
namespace pipeline {
-class HashJoinBuildSinkBuilder final : public OperatorBuilder<vectorized::HashJoinNode> {
+class SelectOperatorBuilder final : public OperatorBuilder<vectorized::VSelectNode> {
public:
- HashJoinBuildSinkBuilder(int32_t, ExecNode*);
+ SelectOperatorBuilder(int32_t id, ExecNode* select_node);
OperatorPtr build_operator() override;
- bool is_sink() const override { return true; };
};
-class HashJoinBuildSink final : public Operator<HashJoinBuildSinkBuilder> {
+class SelectOperator final : public StreamingOperator<SelectOperatorBuilder> {
public:
- HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
- bool can_write() override { return true; };
+ SelectOperator(OperatorBuilderBase* operator_builder, ExecNode* select_node);
};
} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 2f51edd12a..705161725b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -35,7 +35,7 @@ OperatorPtr SetProbeSinkOperatorBuilder<is_intersect>::build_operator() {
template <bool is_intersect>
SetProbeSinkOperator<is_intersect>::SetProbeSinkOperator(OperatorBuilderBase* operator_builder,
int child_id, ExecNode* set_node)
- : Operator<SetProbeSinkOperatorBuilder<is_intersect>>(operator_builder, set_node),
+ : StreamingOperator<SetProbeSinkOperatorBuilder<is_intersect>>(operator_builder, set_node),
_child_id(child_id) {}
template <bool is_intersect>
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h
index fab23dd8c1..2fbed5e990 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -46,7 +46,7 @@ private:
};
template <bool is_intersect>
-class SetProbeSinkOperator : public Operator<SetProbeSinkOperatorBuilder<is_intersect>> {
+class SetProbeSinkOperator : public StreamingOperator<SetProbeSinkOperatorBuilder<is_intersect>> {
public:
SetProbeSinkOperator(OperatorBuilderBase* operator_builder, int child_id, ExecNode* set_node);
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp
index aaa85c31f1..ea85b4f54e 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -34,7 +34,7 @@ OperatorPtr SetSinkOperatorBuilder<is_intersect>::build_operator() {
template <bool is_intersect>
SetSinkOperator<is_intersect>::SetSinkOperator(
OperatorBuilderBase* builder, vectorized::VSetOperationNode<is_intersect>* set_node)
- : Operator<SetSinkOperatorBuilder<is_intersect>>(builder, set_node) {}
+ : StreamingOperator<SetSinkOperatorBuilder<is_intersect>>(builder, set_node) {}
template class SetSinkOperatorBuilder<true>;
template class SetSinkOperatorBuilder<false>;
diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h
index 2f45b04e35..d729666ee9 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -43,7 +43,7 @@ public:
};
template <bool is_intersect>
-class SetSinkOperator : public Operator<SetSinkOperatorBuilder<is_intersect>> {
+class SetSinkOperator : public StreamingOperator<SetSinkOperatorBuilder<is_intersect>> {
public:
SetSinkOperator(OperatorBuilderBase* operator_builder,
vectorized::VSetOperationNode<is_intersect>* set_node);
diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp
index bb14936e83..078df0476e 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -34,7 +34,7 @@ OperatorPtr SetSourceOperatorBuilder<is_intersect>::build_operator() {
template <bool is_intersect>
SetSourceOperator<is_intersect>::SetSourceOperator(
OperatorBuilderBase* builder, vectorized::VSetOperationNode<is_intersect>* set_node)
- : Operator<SetSourceOperatorBuilder<is_intersect>>(builder, set_node) {}
+ : SourceOperator<SetSourceOperatorBuilder<is_intersect>>(builder, set_node) {}
template class SetSourceOperatorBuilder<true>;
template class SetSourceOperatorBuilder<false>;
diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h
index f8d3969fa6..4a90162759 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -43,7 +43,7 @@ public:
};
template <bool is_intersect>
-class SetSourceOperator : public Operator<SetSourceOperatorBuilder<is_intersect>> {
+class SetSourceOperator : public SourceOperator<SetSourceOperatorBuilder<is_intersect>> {
public:
SetSourceOperator(OperatorBuilderBase* builder,
vectorized::VSetOperationNode<is_intersect>* set_node);
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp
index 6f65718b3a..cdcd75aaaa 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -21,6 +21,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(SortSinkOperator, Operator)
+OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator)
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h
index 6574486879..aa317e12e9 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -36,7 +36,7 @@ public:
OperatorPtr build_operator() override;
};
-class SortSinkOperator final : public Operator<SortSinkOperatorBuilder> {
+class SortSinkOperator final : public StreamingOperator<SortSinkOperatorBuilder> {
public:
SortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node);
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp
index e0be159263..a2b859d949 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -21,6 +21,6 @@
namespace doris::pipeline {
-OPERATOR_CODE_GENERATOR(SortSourceOperator, Operator)
+OPERATOR_CODE_GENERATOR(SortSourceOperator, SourceOperator)
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h
index f30f4a725c..017e91f60c 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -36,7 +36,7 @@ public:
OperatorPtr build_operator() override;
};
-class SortSourceOperator final : public Operator<SortSourceOperatorBuilder> {
+class SortSourceOperator final : public SourceOperator<SortSourceOperatorBuilder> {
public:
SortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node);
Status open(RuntimeState*) override { return Status::OK(); }
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
index 2a6c321db8..d4049476ac 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
@@ -24,10 +24,10 @@ namespace doris::pipeline {
StreamingAggSinkOperator::StreamingAggSinkOperator(OperatorBuilderBase* operator_builder,
ExecNode* agg_node,
std::shared_ptr<AggContext> agg_context)
- : Operator(operator_builder, agg_node), _agg_context(std::move(agg_context)) {}
+ : StreamingOperator(operator_builder, agg_node), _agg_context(std::move(agg_context)) {}
Status StreamingAggSinkOperator::prepare(RuntimeState* state) {
- RETURN_IF_ERROR(Operator::prepare(state));
+ RETURN_IF_ERROR(StreamingOperator::prepare(state));
_queue_byte_size_counter =
ADD_COUNTER(_runtime_profile.get(), "MaxSizeInBlockQueue", TUnit::BYTES);
_queue_size_counter = ADD_COUNTER(_runtime_profile.get(), "MaxSizeOfBlockQueue", TUnit::UNIT);
@@ -60,14 +60,14 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in
}
Status StreamingAggSinkOperator::close(RuntimeState* state) {
- Operator::close(state);
if (_agg_context && !_agg_context->is_finish()) {
// finish should be set, if not set here means error.
_agg_context->set_canceled();
}
COUNTER_SET(_queue_size_counter, _agg_context->max_size_of_queue());
COUNTER_SET(_queue_byte_size_counter, _agg_context->max_bytes_in_queue());
- return Status::OK();
+ return StreamingOperator::close(state);
+ ;
}
StreamingAggSinkOperatorBuilder::StreamingAggSinkOperatorBuilder(
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index fe3edc7b52..26e080c2a3 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -42,7 +42,7 @@ private:
std::shared_ptr<AggContext> _agg_context;
};
-class StreamingAggSinkOperator final : public Operator<StreamingAggSinkOperatorBuilder> {
+class StreamingAggSinkOperator final : public StreamingOperator<StreamingAggSinkOperatorBuilder> {
public:
StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*,
std::shared_ptr<AggContext>);
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index e246876a0c..5451ae86b9 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -23,7 +23,7 @@ namespace doris {
namespace pipeline {
StreamingAggSourceOperator::StreamingAggSourceOperator(OperatorBuilderBase* templ, ExecNode* node,
std::shared_ptr<AggContext> agg_context)
- : Operator(templ, node), _agg_context(std::move(agg_context)) {}
+ : SourceOperator(templ, node), _agg_context(std::move(agg_context)) {}
bool StreamingAggSourceOperator::can_read() {
return _agg_context->has_data_or_finished();
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
index 11ec3a6725..fba6fab2a4 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
@@ -38,7 +38,7 @@ private:
std::shared_ptr<AggContext> _agg_context;
};
-class StreamingAggSourceOperator final : public Operator<StreamingAggSourceOperatorBuilder> {
+class StreamingAggSourceOperator final : public SourceOperator<StreamingAggSourceOperatorBuilder> {
public:
StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr<AggContext>);
bool can_read() override;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 0a6a76e3a9..f88fd6b931 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -38,6 +38,7 @@
#include "exec/scan_node.h"
#include "exec/scan_operator.h"
#include "exec/schema_scan_operator.h"
+#include "exec/select_operator.h"
#include "exec/set_probe_sink_operator.h"
#include "exec/set_sink_operator.h"
#include "exec/set_source_operator.h"
@@ -459,6 +460,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(node, cur_pipe));
break;
}
+ case TPlanNodeType::SELECT_NODE: {
+ RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
+ OperatorBuilderPtr builder =
+ std::make_shared<SelectOperatorBuilder>(next_operator_builder_id(), node);
+ RETURN_IF_ERROR(cur_pipe->add_operator(builder));
+ break;
+ }
default:
return Status::InternalError("Unsupported exec type in pipeline: {}",
print_plan_node_type(node_type));
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index 8f561f10a0..c8060d5e41 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -57,8 +57,16 @@ Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool
}
} while (block->rows() == 0);
- RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns()));
- reached_limit(block, eos);
+ return pull(state, block, eos);
+}
+
+Status VSelectNode::pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+ INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VSelectNode::pull");
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(
+ VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns()));
+ reached_limit(output_block, eos);
return Status::OK();
}
diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h
index 9cabb82189..afff5cb734 100644
--- a/be/src/vec/exec/vselect_node.h
+++ b/be/src/vec/exec/vselect_node.h
@@ -21,15 +21,16 @@
namespace doris {
namespace vectorized {
-class VSelectNode : public ExecNode {
+class VSelectNode final : public ExecNode {
public:
VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
- virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
- virtual Status prepare(RuntimeState* state);
- virtual Status open(RuntimeState* state);
- virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
- virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
- virtual Status close(RuntimeState* state);
+ Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+ Status prepare(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
+ Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+ Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
+ Status close(RuntimeState* state) override;
+ Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
private:
// true if last get_next() call on child signalled eos
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org