You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/28 13:09:52 UTC
[doris] branch runtimefilter_multi_send updated: [feature](runtime_filter) MultiCastSender supporting runtime filter on be (#21304)
This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch runtimefilter_multi_send
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/runtimefilter_multi_send by this push:
new 254ec1ba53 [feature](runtime_filter) MultiCastSender supporting runtime filter on be (#21304)
254ec1ba53 is described below
commit 254ec1ba5304ea9ef557e0757f7d1b7cb51df008
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Wed Jun 28 21:09:43 2023 +0800
[feature](runtime_filter) MultiCastSender supporting runtime filter on be (#21304)
---
.../exec/multi_cast_data_stream_source.cpp | 61 +++++++++++++++++++---
.../pipeline/exec/multi_cast_data_stream_source.h | 23 ++++++--
be/src/pipeline/pipeline_fragment_context.cpp | 3 +-
...nsumer_node.cpp => runtime_filter_consumer.cpp} | 44 +++++++++-------
...r_consumer_node.h => runtime_filter_consumer.h} | 28 ++++++----
be/src/vec/exec/scan/vscan_node.cpp | 4 +-
be/src/vec/exec/scan/vscan_node.h | 10 ++--
be/src/vec/exec/vselect_node.cpp | 20 ++-----
be/src/vec/exec/vselect_node.h | 8 +--
gensrc/thrift/DataSinks.thrift | 12 +++++
10 files changed, 146 insertions(+), 67 deletions(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 06211faf52..18ca62ea15 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -27,14 +27,16 @@
namespace doris::pipeline {
MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder(
- int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer)
+ int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer,
+ const TDataStreamSink& sink)
: OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"),
_consumer_id(consumer_id),
- _multi_cast_data_streamer(data_streamer) {};
+ _multi_cast_data_streamer(data_streamer),
+ _t_data_stream_sink(sink) {}
OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() {
- return std::make_shared<MultiCastDataStreamerSourceOperator>(this, _consumer_id,
- _multi_cast_data_streamer);
+ return std::make_shared<MultiCastDataStreamerSourceOperator>(
+ this, _consumer_id, _multi_cast_data_streamer, _t_data_stream_sink);
}
const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
@@ -43,10 +45,44 @@ const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator(
OperatorBuilderBase* operator_builder, const int consumer_id,
- std::shared_ptr<MultiCastDataStreamer>& data_streamer)
+ std::shared_ptr<MultiCastDataStreamer>& data_streamer, const TDataStreamSink& sink)
: OperatorBase(operator_builder),
+ vectorized::RuntimeFilterConsumer(sink.dest_node_id, sink.runtime_filters,
+ data_streamer->row_desc(), _conjuncts),
_consumer_id(consumer_id),
- _multi_cast_data_streamer(data_streamer) {};
+ _multi_cast_data_streamer(data_streamer),
+ _t_data_stream_sink(sink) {}
+
+Status MultiCastDataStreamerSourceOperator::init(const TDataSink& tsink) {
+ RETURN_IF_ERROR(OperatorBase::init(tsink));
+ if (_t_data_stream_sink.__isset.output_exprs) {
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs,
+ _output_expr_contexts));
+ }
+
+ if (_t_data_stream_sink.__isset.conjuncts) {
+ RETURN_IF_ERROR(
+ vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, _conjuncts));
+ }
+
+ return Status::OK();
+}
+
+Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState* state) {
+ RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state));
+ _register_runtime_filter();
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, row_desc()));
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, row_desc()));
+ return Status::OK();
+}
+
+Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
+ return _acquire_runtime_filter(state);
+}
+
+bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
+ return vectorized::RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout();
+}
bool MultiCastDataStreamerSourceOperator::can_read() {
return _multi_cast_data_streamer->can_read(_consumer_id);
@@ -56,6 +92,19 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
SourceState& source_state) {
bool eos = false;
_multi_cast_data_streamer->pull(_consumer_id, block, &eos);
+ if (!_output_expr_contexts.empty()) {
+ vectorized::Block output_block;
+ RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
+ _output_expr_contexts, *block, &output_block));
+ materialize_block_inplace(output_block);
+ block->swap(output_block);
+ }
+
+ if (!_conjuncts.empty()) {
+ RETURN_IF_ERROR(
+ vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
+ }
+
if (eos) {
source_state = SourceState::FINISHED;
}
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 15bd320b89..b2c7ca9e55 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -22,6 +22,7 @@
#include "common/status.h"
#include "operator.h"
+#include "vec/exec/runtime_filter_consumer.h"
namespace doris {
class ExecNode;
@@ -37,7 +38,8 @@ class MultiCastDataStreamer;
class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderBase {
public:
MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int consumer_id,
- std::shared_ptr<MultiCastDataStreamer>&);
+ std::shared_ptr<MultiCastDataStreamer>&,
+ const TDataStreamSink&);
bool is_source() const override { return true; }
@@ -48,20 +50,27 @@ public:
private:
const int _consumer_id;
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
+ TDataStreamSink _t_data_stream_sink;
};
-class MultiCastDataStreamerSourceOperator final : public OperatorBase {
+class MultiCastDataStreamerSourceOperator final : public OperatorBase,
+ public vectorized::RuntimeFilterConsumer {
public:
MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder,
const int consumer_id,
- std::shared_ptr<MultiCastDataStreamer>& data_streamer);
+ std::shared_ptr<MultiCastDataStreamer>& data_streamer,
+ const TDataStreamSink& sink);
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
- Status prepare(RuntimeState* state) override { return Status::OK(); };
+ Status init(const TDataSink& tsink) override;
- Status open(RuntimeState* state) override { return Status::OK(); };
+ Status prepare(RuntimeState* state) override;
+
+ Status open(RuntimeState* state) override;
+
+ bool runtime_filters_are_ready_or_timeout() override;
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override {
return Status::OK();
@@ -76,6 +85,10 @@ public:
private:
const int _consumer_id;
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
+ TDataStreamSink _t_data_stream_sink;
+
+ vectorized::VExprContextSPtrs _output_expr_contexts;
+ vectorized::VExprContextSPtrs _conjuncts;
};
} // namespace pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 7c24be8ded..a97128ee32 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -776,7 +776,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
// 2. create and set the source operator of multi_cast_data_stream_source for new pipeline
OperatorBuilderPtr source_op =
std::make_shared<MultiCastDataStreamerSourceOperatorBuilder>(
- next_operator_builder_id(), i, multi_cast_data_streamer);
+ next_operator_builder_id(), i, multi_cast_data_streamer,
+ thrift_sink.multi_cast_stream_sink.sinks[i]);
new_pipeline->add_operator(source_op);
// 3. create and set sink operator of data stream sender for new pipeline
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp
similarity index 77%
rename from be/src/vec/exec/runtime_filter_consumer_node.cpp
rename to be/src/vec/exec/runtime_filter_consumer.cpp
index dd631ce66e..ed74d7ecd0 100644
--- a/be/src/vec/exec/runtime_filter_consumer_node.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -15,22 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "vec/exec/runtime_filter_consumer.h"
namespace doris::vectorized {
-RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode,
- const DescriptorTbl& descs)
- : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {}
+RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id,
+ const std::vector<TRuntimeFilterDesc>& runtime_filters,
+ const RowDescriptor& row_descriptor,
+ VExprContextSPtrs& conjuncts)
+ : _filter_id(filter_id),
+ _runtime_filter_descs(runtime_filters),
+ _row_descriptor_ref(row_descriptor),
+ _conjuncts_ref(conjuncts) {}
-Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::init(tnode, state));
+Status RuntimeFilterConsumer::init(RuntimeState* state) {
_state = state;
RETURN_IF_ERROR(_register_runtime_filter());
return Status::OK();
}
-Status RuntimeFilterConsumerNode::_register_runtime_filter() {
+Status RuntimeFilterConsumer::_register_runtime_filter() {
int filter_size = _runtime_filter_descs.size();
_runtime_filter_ctxs.reserve(filter_size);
_runtime_filter_ready_flag.reserve(filter_size);
@@ -43,14 +47,14 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() {
// 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
// 2. This filter is bloom filter (only bloom filter should be used for merging)
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, _state->query_options(), id(), false));
+ filter_desc, _state->query_options(), _filter_id, false));
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
- filter_desc.filter_id, id(), &runtime_filter));
+ filter_desc.filter_id, _filter_id, &runtime_filter));
} else {
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, _state->query_options(), id(), false));
+ filter_desc, _state->query_options(), _filter_id, false));
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(
- filter_desc.filter_id, id(), &runtime_filter));
+ filter_desc.filter_id, _filter_id, &runtime_filter));
}
_runtime_filter_ctxs.emplace_back(runtime_filter);
_runtime_filter_ready_flag.emplace_back(false);
@@ -58,7 +62,7 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() {
return Status::OK();
}
-bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
+bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
if (!_blocked_by_rf) {
return true;
}
@@ -72,7 +76,7 @@ bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
return true;
}
-Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool wait) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
VExprSPtrs vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
@@ -101,23 +105,23 @@ Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
return Status::OK();
}
-Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
+Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
if (vexprs.empty()) {
return Status::OK();
}
for (auto& expr : vexprs) {
VExprContextSPtr conjunct = VExprContext::create_shared(expr);
- RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
+ RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
RETURN_IF_ERROR(conjunct->open(_state));
_rf_vexpr_set.insert(expr);
- _conjuncts.emplace_back(conjunct);
+ _conjuncts_ref.emplace_back(conjunct);
}
return Status::OK();
}
-Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
+Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
if (_is_all_rf_applied) {
*arrived_rf_num = _runtime_filter_descs.size();
return Status::OK();
@@ -140,12 +144,12 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar
continue;
} else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
- &exprs, _row_descriptor, _state));
+ &exprs, _row_descriptor_ref, _state));
++current_arrived_rf_num;
_runtime_filter_ctxs[i].apply_mark = true;
}
}
- // 2. Append unapplied runtime filters to vconjunct_ctx_ptr
+ // 2. Append unapplied runtime filters to _conjuncts
if (!exprs.empty()) {
RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
}
@@ -157,7 +161,7 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar
return Status::OK();
}
-void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) {
+void RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) {
_acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime");
}
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.h b/be/src/vec/exec/runtime_filter_consumer.h
similarity index 81%
rename from be/src/vec/exec/runtime_filter_consumer_node.h
rename to be/src/vec/exec/runtime_filter_consumer.h
index 518e0e865c..18e92abc90 100644
--- a/be/src/vec/exec/runtime_filter_consumer_node.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -22,14 +22,16 @@
namespace doris::vectorized {
-class RuntimeFilterConsumerNode : public ExecNode {
+class RuntimeFilterConsumer {
public:
- RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
- ~RuntimeFilterConsumerNode() override = default;
+ RuntimeFilterConsumer(const int32_t filter_id,
+ const std::vector<TRuntimeFilterDesc>& runtime_filters,
+ const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts);
+ ~RuntimeFilterConsumer() = default;
- Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+ Status init(RuntimeState* state);
- // Try append late arrived runtime filters.
+ // Try to append late arrived runtime filters.
// Return num of filters which are applied already.
Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
@@ -54,15 +56,23 @@ protected:
IRuntimeFilter* runtime_filter;
};
- RuntimeState* _state;
-
std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
-
- std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
// Set to true if the runtime filter is ready.
std::vector<bool> _runtime_filter_ready_flag;
doris::Mutex _rf_locks;
phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
+
+private:
+ RuntimeState* _state;
+
+ int32_t _filter_id;
+
+ std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+
+ const RowDescriptor& _row_descriptor_ref;
+
+ VExprContextSPtrs& _conjuncts_ref;
+
// True means all runtime filters are applied to scanners
bool _is_all_rf_applied = true;
bool _blocked_by_rf = false;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 2af6fc87c5..e85775ffa1 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -94,7 +94,9 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
}
Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state));
+ RETURN_IF_ERROR(ExecNode::init(tnode, state));
+ RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
+ _state = state;
_is_pipeline_scan = state->enable_pipeline_exec();
const TQueryOptions& query_options = state->query_options();
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 112ca47b54..52284ffe95 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -43,7 +43,7 @@
#include "runtime/runtime_state.h"
#include "util/lock.h"
#include "util/runtime_profile.h"
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "vec/exec/runtime_filter_consumer.h"
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscanner.h"
#include "vec/runtime/shared_scanner_controller.h"
@@ -88,10 +88,12 @@ struct FilterPredicates {
std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> in_filters;
};
-class VScanNode : public RuntimeFilterConsumerNode {
+class VScanNode : public ExecNode, public RuntimeFilterConsumer {
public:
VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : RuntimeFilterConsumerNode(pool, tnode, descs) {
+ : ExecNode(pool, tnode, descs),
+ RuntimeFilterConsumer(id(), tnode.runtime_filters, ExecNode::_row_descriptor,
+ ExecNode::_conjuncts) {
if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
// Which means the request could be fullfilled in a single segment iterator request.
if (tnode.limit > 0 && tnode.limit < 1024) {
@@ -304,6 +306,8 @@ protected:
VExprContextSPtrs _stale_expr_ctxs;
VExprContextSPtrs _common_expr_ctxs_push_down;
+ RuntimeState* _state;
+
// If sort info is set, push limit to each scanner;
int64_t _limit_per_scanner = -1;
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index 626fd5ce96..ee1628cd19 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -37,34 +37,22 @@ class TPlanNode;
namespace vectorized {
VSelectNode::VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : RuntimeFilterConsumerNode(pool, tnode, descs), _child_eos(false) {}
+ : ExecNode(pool, tnode, descs), _child_eos(false) {}
Status VSelectNode::init(const TPlanNode& tnode, RuntimeState* state) {
- return RuntimeFilterConsumerNode::init(tnode, state);
+ return ExecNode::init(tnode, state);
}
Status VSelectNode::prepare(RuntimeState* state) {
- return RuntimeFilterConsumerNode::prepare(state);
+ return ExecNode::prepare(state);
}
Status VSelectNode::open(RuntimeState* state) {
- RETURN_IF_ERROR(RuntimeFilterConsumerNode::open(state));
+ RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(child(0)->open(state));
return Status::OK();
}
-Status VSelectNode::alloc_resource(RuntimeState* state) {
- if (_opened) {
- return Status::OK();
- }
-
- RETURN_IF_ERROR(RuntimeFilterConsumerNode::alloc_resource(state));
- RETURN_IF_ERROR(_acquire_runtime_filter());
- RETURN_IF_CANCELLED(state);
- _opened = true;
- return Status::OK();
-}
-
Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h
index 140009e4b3..1b425e80af 100644
--- a/be/src/vec/exec/vselect_node.h
+++ b/be/src/vec/exec/vselect_node.h
@@ -17,7 +17,7 @@
#pragma once
#include "common/status.h"
-#include "vec/exec/runtime_filter_consumer_node.h"
+#include "exec/exec_node.h"
namespace doris {
class DescriptorTbl;
@@ -28,7 +28,7 @@ class TPlanNode;
namespace vectorized {
class Block;
-class VSelectNode final : public RuntimeFilterConsumerNode {
+class VSelectNode final : public ExecNode {
public:
VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
@@ -37,13 +37,9 @@ public:
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
Status close(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
-
- Status alloc_resource(RuntimeState* state) override;
-
private:
// true if last get_next() call on child signalled eos
bool _child_eos;
- bool _opened = false;
};
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index af5d4d26a3..c78a7900a9 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -153,6 +153,18 @@ struct TDataStreamSink {
2: required Partitions.TDataPartition output_partition
3: optional bool ignore_not_found
+
+ // per-destination projections
+ 4: optional list<Exprs.TExpr> output_exprs
+
+ // project output tuple id
+ 5: optional Types.TTupleId output_tuple_id
+
+ // per-destination filters
+ 6: optional list<Exprs.TExpr> conjuncts
+
+ // per-destination runtime filters
+ 7: optional list<PlanNodes.TRuntimeFilterDesc> runtime_filters
}
struct TMultiCastDataStreamSink {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org