You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/30 16:08:18 UTC

[doris] 01/04: [feature](runtime_filter) MultiCastSender supporting runtime filter on be (#21304)

This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch runtimefilter_multi_send
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2a158caf5d533ecbc5b129f79daf44d88272a665
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Wed Jun 28 21:09:43 2023 +0800

    [feature](runtime_filter) MultiCastSender supporting runtime filter on be (#21304)
---
 .../exec/multi_cast_data_stream_source.cpp         | 61 +++++++++++++++++++---
 .../pipeline/exec/multi_cast_data_stream_source.h  | 23 ++++++--
 be/src/pipeline/pipeline_fragment_context.cpp      |  3 +-
 ...nsumer_node.cpp => runtime_filter_consumer.cpp} | 44 +++++++++-------
 ...r_consumer_node.h => runtime_filter_consumer.h} | 28 ++++++----
 be/src/vec/exec/scan/vscan_node.cpp                |  4 +-
 be/src/vec/exec/scan/vscan_node.h                  | 10 ++--
 be/src/vec/exec/vselect_node.cpp                   | 20 ++-----
 be/src/vec/exec/vselect_node.h                     |  8 +--
 gensrc/thrift/DataSinks.thrift                     | 12 +++++
 10 files changed, 146 insertions(+), 67 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 06211faf52..18ca62ea15 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -27,14 +27,16 @@
 namespace doris::pipeline {
 
 MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder(
-        int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer)
+        int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer,
+        const TDataStreamSink& sink)
         : OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"),
           _consumer_id(consumer_id),
-          _multi_cast_data_streamer(data_streamer) {};
+          _multi_cast_data_streamer(data_streamer),
+          _t_data_stream_sink(sink) {}
 
 OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() {
-    return std::make_shared<MultiCastDataStreamerSourceOperator>(this, _consumer_id,
-                                                                 _multi_cast_data_streamer);
+    return std::make_shared<MultiCastDataStreamerSourceOperator>(
+            this, _consumer_id, _multi_cast_data_streamer, _t_data_stream_sink);
 }
 
 const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
@@ -43,10 +45,44 @@ const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
 
 MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator(
         OperatorBuilderBase* operator_builder, const int consumer_id,
-        std::shared_ptr<MultiCastDataStreamer>& data_streamer)
+        std::shared_ptr<MultiCastDataStreamer>& data_streamer, const TDataStreamSink& sink)
         : OperatorBase(operator_builder),
+          vectorized::RuntimeFilterConsumer(sink.dest_node_id, sink.runtime_filters,
+                                            data_streamer->row_desc(), _conjuncts),
           _consumer_id(consumer_id),
-          _multi_cast_data_streamer(data_streamer) {};
+          _multi_cast_data_streamer(data_streamer),
+          _t_data_stream_sink(sink) {}
+
+Status MultiCastDataStreamerSourceOperator::init(const TDataSink& tsink) {
+    RETURN_IF_ERROR(OperatorBase::init(tsink));
+    if (_t_data_stream_sink.__isset.output_exprs) {
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs,
+                                                             _output_expr_contexts));
+    }
+
+    if (_t_data_stream_sink.__isset.conjuncts) {
+        RETURN_IF_ERROR(
+                vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, _conjuncts));
+    }
+
+    return Status::OK();
+}
+
+Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState* state) {
+    RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state));
+    _register_runtime_filter();
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, row_desc()));
+    return Status::OK();
+}
+
+Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
+    return _acquire_runtime_filter(state);
+}
+
+bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
+    return vectorized::RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout();
+}
 
 bool MultiCastDataStreamerSourceOperator::can_read() {
     return _multi_cast_data_streamer->can_read(_consumer_id);
@@ -56,6 +92,19 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
                                                       SourceState& source_state) {
     bool eos = false;
     _multi_cast_data_streamer->pull(_consumer_id, block, &eos);
+    if (!_output_expr_contexts.empty()) {
+        vectorized::Block output_block;
+        RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
+                _output_expr_contexts, *block, &output_block));
+        materialize_block_inplace(output_block);
+        block->swap(output_block);
+    }
+
+    if (!_conjuncts.empty()) {
+        RETURN_IF_ERROR(
+                vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
+    }
+
     if (eos) {
         source_state = SourceState::FINISHED;
     }
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 15bd320b89..b2c7ca9e55 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -22,6 +22,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "vec/exec/runtime_filter_consumer.h"
 
 namespace doris {
 class ExecNode;
@@ -37,7 +38,8 @@ class MultiCastDataStreamer;
 class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderBase {
 public:
     MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int consumer_id,
-                                               std::shared_ptr<MultiCastDataStreamer>&);
+                                               std::shared_ptr<MultiCastDataStreamer>&,
+                                               const TDataStreamSink&);
 
     bool is_source() const override { return true; }
 
@@ -48,20 +50,27 @@ public:
 private:
     const int _consumer_id;
     std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
+    TDataStreamSink _t_data_stream_sink;
 };
 
-class MultiCastDataStreamerSourceOperator final : public OperatorBase {
+class MultiCastDataStreamerSourceOperator final : public OperatorBase,
+                                                  public vectorized::RuntimeFilterConsumer {
 public:
     MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder,
                                         const int consumer_id,
-                                        std::shared_ptr<MultiCastDataStreamer>& data_streamer);
+                                        std::shared_ptr<MultiCastDataStreamer>& data_streamer,
+                                        const TDataStreamSink& sink);
 
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override;
 
-    Status prepare(RuntimeState* state) override { return Status::OK(); };
+    Status init(const TDataSink& tsink) override;
 
-    Status open(RuntimeState* state) override { return Status::OK(); };
+    Status prepare(RuntimeState* state) override;
+
+    Status open(RuntimeState* state) override;
+
+    bool runtime_filters_are_ready_or_timeout() override;
 
     Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override {
         return Status::OK();
@@ -76,6 +85,10 @@ public:
 private:
     const int _consumer_id;
     std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
+    TDataStreamSink _t_data_stream_sink;
+
+    vectorized::VExprContextSPtrs _output_expr_contexts;
+    vectorized::VExprContextSPtrs _conjuncts;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 7c24be8ded..a97128ee32 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -776,7 +776,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
             // 2. create and set the source operator of multi_cast_data_stream_source for new pipeline
             OperatorBuilderPtr source_op =
                     std::make_shared<MultiCastDataStreamerSourceOperatorBuilder>(
-                            next_operator_builder_id(), i, multi_cast_data_streamer);
+                            next_operator_builder_id(), i, multi_cast_data_streamer,
+                            thrift_sink.multi_cast_stream_sink.sinks[i]);
             new_pipeline->add_operator(source_op);
 
             // 3. create and set sink operator of data stream sender for new pipeline
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp
similarity index 77%
rename from be/src/vec/exec/runtime_filter_consumer_node.cpp
rename to be/src/vec/exec/runtime_filter_consumer.cpp
index dd631ce66e..ed74d7ecd0 100644
--- a/be/src/vec/exec/runtime_filter_consumer_node.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -15,22 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "vec/exec/runtime_filter_consumer.h"
 
 namespace doris::vectorized {
 
-RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode,
-                                                     const DescriptorTbl& descs)
-        : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {}
+RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id,
+                                             const std::vector<TRuntimeFilterDesc>& runtime_filters,
+                                             const RowDescriptor& row_descriptor,
+                                             VExprContextSPtrs& conjuncts)
+        : _filter_id(filter_id),
+          _runtime_filter_descs(runtime_filters),
+          _row_descriptor_ref(row_descriptor),
+          _conjuncts_ref(conjuncts) {}
 
-Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+Status RuntimeFilterConsumer::init(RuntimeState* state) {
     _state = state;
     RETURN_IF_ERROR(_register_runtime_filter());
     return Status::OK();
 }
 
-Status RuntimeFilterConsumerNode::_register_runtime_filter() {
+Status RuntimeFilterConsumer::_register_runtime_filter() {
     int filter_size = _runtime_filter_descs.size();
     _runtime_filter_ctxs.reserve(filter_size);
     _runtime_filter_ready_flag.reserve(filter_size);
@@ -43,14 +47,14 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() {
             // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
             // 2. This filter is bloom filter (only bloom filter should be used for merging)
             RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), id(), false));
+                    filter_desc, _state->query_options(), _filter_id, false));
             RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, id(), &runtime_filter));
+                    filter_desc.filter_id, _filter_id, &runtime_filter));
         } else {
             RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), id(), false));
+                    filter_desc, _state->query_options(), _filter_id, false));
             RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, id(), &runtime_filter));
+                    filter_desc.filter_id, _filter_id, &runtime_filter));
         }
         _runtime_filter_ctxs.emplace_back(runtime_filter);
         _runtime_filter_ready_flag.emplace_back(false);
@@ -58,7 +62,7 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() {
     return Status::OK();
 }
 
-bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
+bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
     if (!_blocked_by_rf) {
         return true;
     }
@@ -72,7 +76,7 @@ bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
     return true;
 }
 
-Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool wait) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     VExprSPtrs vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
@@ -101,23 +105,23 @@ Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
     return Status::OK();
 }
 
-Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
+Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
     if (vexprs.empty()) {
         return Status::OK();
     }
 
     for (auto& expr : vexprs) {
         VExprContextSPtr conjunct = VExprContext::create_shared(expr);
-        RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
+        RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
         RETURN_IF_ERROR(conjunct->open(_state));
         _rf_vexpr_set.insert(expr);
-        _conjuncts.emplace_back(conjunct);
+        _conjuncts_ref.emplace_back(conjunct);
     }
 
     return Status::OK();
 }
 
-Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
+Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
     if (_is_all_rf_applied) {
         *arrived_rf_num = _runtime_filter_descs.size();
         return Status::OK();
@@ -140,12 +144,12 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar
             continue;
         } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
             RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
-                    &exprs, _row_descriptor, _state));
+                    &exprs, _row_descriptor_ref, _state));
             ++current_arrived_rf_num;
             _runtime_filter_ctxs[i].apply_mark = true;
         }
     }
-    // 2. Append unapplied runtime filters to vconjunct_ctx_ptr
+    // 2. Append unapplied runtime filters to _conjuncts
     if (!exprs.empty()) {
         RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
     }
@@ -157,7 +161,7 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar
     return Status::OK();
 }
 
-void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) {
+void RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) {
     _acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime");
 }
 
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.h b/be/src/vec/exec/runtime_filter_consumer.h
similarity index 81%
rename from be/src/vec/exec/runtime_filter_consumer_node.h
rename to be/src/vec/exec/runtime_filter_consumer.h
index 518e0e865c..18e92abc90 100644
--- a/be/src/vec/exec/runtime_filter_consumer_node.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -22,14 +22,16 @@
 
 namespace doris::vectorized {
 
-class RuntimeFilterConsumerNode : public ExecNode {
+class RuntimeFilterConsumer {
 public:
-    RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-    ~RuntimeFilterConsumerNode() override = default;
+    RuntimeFilterConsumer(const int32_t filter_id,
+                          const std::vector<TRuntimeFilterDesc>& runtime_filters,
+                          const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts);
+    ~RuntimeFilterConsumer() = default;
 
-    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+    Status init(RuntimeState* state);
 
-    // Try append late arrived runtime filters.
+    // Try to append late arrived runtime filters.
     // Return num of filters which are applied already.
     Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
 
@@ -54,15 +56,23 @@ protected:
         IRuntimeFilter* runtime_filter;
     };
 
-    RuntimeState* _state;
-
     std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
-
-    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     // Set to true if the runtime filter is ready.
     std::vector<bool> _runtime_filter_ready_flag;
     doris::Mutex _rf_locks;
     phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
+
+private:
+    RuntimeState* _state;
+
+    int32_t _filter_id;
+
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+
+    const RowDescriptor& _row_descriptor_ref;
+
+    VExprContextSPtrs& _conjuncts_ref;
+
     // True means all runtime filters are applied to scanners
     bool _is_all_rf_applied = true;
     bool _blocked_by_rf = false;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 2af6fc87c5..e85775ffa1 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -94,7 +94,9 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
 }
 
 Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state));
+    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
+    _state = state;
     _is_pipeline_scan = state->enable_pipeline_exec();
 
     const TQueryOptions& query_options = state->query_options();
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 112ca47b54..52284ffe95 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -43,7 +43,7 @@
 #include "runtime/runtime_state.h"
 #include "util/lock.h"
 #include "util/runtime_profile.h"
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "vec/exec/runtime_filter_consumer.h"
 #include "vec/exec/scan/scanner_context.h"
 #include "vec/exec/scan/vscanner.h"
 #include "vec/runtime/shared_scanner_controller.h"
@@ -88,10 +88,12 @@ struct FilterPredicates {
     std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> in_filters;
 };
 
-class VScanNode : public RuntimeFilterConsumerNode {
+class VScanNode : public ExecNode, public RuntimeFilterConsumer {
 public:
     VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-            : RuntimeFilterConsumerNode(pool, tnode, descs) {
+            : ExecNode(pool, tnode, descs),
+              RuntimeFilterConsumer(id(), tnode.runtime_filters, ExecNode::_row_descriptor,
+                                    ExecNode::_conjuncts) {
         if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
             // Which means the request could be fullfilled in a single segment iterator request.
             if (tnode.limit > 0 && tnode.limit < 1024) {
@@ -304,6 +306,8 @@ protected:
     VExprContextSPtrs _stale_expr_ctxs;
     VExprContextSPtrs _common_expr_ctxs_push_down;
 
+    RuntimeState* _state;
+
     // If sort info is set, push limit to each scanner;
     int64_t _limit_per_scanner = -1;
 
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index 626fd5ce96..ee1628cd19 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -37,34 +37,22 @@ class TPlanNode;
 namespace vectorized {
 
 VSelectNode::VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-        : RuntimeFilterConsumerNode(pool, tnode, descs), _child_eos(false) {}
+        : ExecNode(pool, tnode, descs), _child_eos(false) {}
 
 Status VSelectNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    return RuntimeFilterConsumerNode::init(tnode, state);
+    return ExecNode::init(tnode, state);
 }
 
 Status VSelectNode::prepare(RuntimeState* state) {
-    return RuntimeFilterConsumerNode::prepare(state);
+    return ExecNode::prepare(state);
 }
 
 Status VSelectNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(RuntimeFilterConsumerNode::open(state));
+    RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(child(0)->open(state));
     return Status::OK();
 }
 
-Status VSelectNode::alloc_resource(RuntimeState* state) {
-    if (_opened) {
-        return Status::OK();
-    }
-
-    RETURN_IF_ERROR(RuntimeFilterConsumerNode::alloc_resource(state));
-    RETURN_IF_ERROR(_acquire_runtime_filter());
-    RETURN_IF_CANCELLED(state);
-    _opened = true;
-    return Status::OK();
-}
-
 Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h
index 140009e4b3..1b425e80af 100644
--- a/be/src/vec/exec/vselect_node.h
+++ b/be/src/vec/exec/vselect_node.h
@@ -17,7 +17,7 @@
 
 #pragma once
 #include "common/status.h"
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "exec/exec_node.h"
 
 namespace doris {
 class DescriptorTbl;
@@ -28,7 +28,7 @@ class TPlanNode;
 namespace vectorized {
 class Block;
 
-class VSelectNode final : public RuntimeFilterConsumerNode {
+class VSelectNode final : public ExecNode {
 public:
     VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
     Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
@@ -37,13 +37,9 @@ public:
     Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
     Status close(RuntimeState* state) override;
     Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
-
-    Status alloc_resource(RuntimeState* state) override;
-
 private:
     // true if last get_next() call on child signalled eos
     bool _child_eos;
-    bool _opened = false;
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index af5d4d26a3..c78a7900a9 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -153,6 +153,18 @@ struct TDataStreamSink {
   2: required Partitions.TDataPartition output_partition
 
   3: optional bool ignore_not_found
+
+    // per-destination projections
+    4: optional list<Exprs.TExpr> output_exprs
+
+    // project output tuple id
+    5: optional Types.TTupleId output_tuple_id
+
+    // per-destination filters
+    6: optional list<Exprs.TExpr> conjuncts
+
+    // per-destination runtime filters
+    7: optional list<PlanNodes.TRuntimeFilterDesc> runtime_filters
 }
 
 struct TMultiCastDataStreamSink {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org