You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/18 13:48:06 UTC

[doris] branch master updated: [Pipeline](runtime filter) Support runtime filters on pipeline engine (#15040)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 13bc8c2ef8 [Pipeline](runtime filter) Support runtime filters on pipeline engine (#15040)
13bc8c2ef8 is described below

commit 13bc8c2ef89fe598adf329a3c1a52c7569f81036
Author: Gabriel <ga...@gmail.com>
AuthorDate: Sun Dec 18 21:48:00 2022 +0800

    [Pipeline](runtime filter) Support runtime filters on pipeline engine (#15040)
---
 be/src/common/status.h                        |   4 +
 be/src/exprs/runtime_filter.cpp               | 101 ++++++++++++++++++++++----
 be/src/exprs/runtime_filter.h                 |  44 +++++++++--
 be/src/exprs/runtime_filter_rpc.cpp           |   1 +
 be/src/pipeline/exec/operator.cpp             |  15 ----
 be/src/pipeline/exec/operator.h               |   6 +-
 be/src/pipeline/exec/scan_operator.cpp        |  18 +----
 be/src/pipeline/exec/scan_operator.h          |   4 +-
 be/src/pipeline/pipeline.cpp                  |  14 +---
 be/src/pipeline/pipeline.h                    |  15 ++--
 be/src/pipeline/pipeline_fragment_context.cpp |   4 -
 be/src/pipeline/pipeline_fragment_context.h   |   6 ++
 be/src/pipeline/pipeline_task.cpp             |  52 ++++++-------
 be/src/pipeline/pipeline_task.h               |  20 ++---
 be/src/pipeline/task_scheduler.cpp            |  25 +++++--
 be/src/runtime/fragment_mgr.cpp               |  56 +++++++++++---
 be/src/runtime/runtime_filter_mgr.cpp         |   2 +
 be/src/runtime/runtime_state.h                |   5 +-
 be/src/vec/exec/scan/vscan_node.cpp           |  53 ++++++++++++--
 be/src/vec/exec/scan/vscan_node.h             |   6 ++
 be/src/vec/exec/vexchange_node.cpp            |  16 +++-
 be/src/vec/exec/vexchange_node.h              |   1 +
 be/src/vec/runtime/vdata_stream_recvr.h       |   4 +-
 gensrc/proto/internal_service.proto           |   2 +
 24 files changed, 323 insertions(+), 151 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 0cfc4517be..0970447a45 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -246,6 +246,7 @@ E(ROWSET_RENAME_FILE_FAILED, -3116);
 E(SEGCOMPACTION_INIT_READER, -3117);
 E(SEGCOMPACTION_INIT_WRITER, -3118);
 E(SEGCOMPACTION_FAILED, -3119);
+E(PIP_WAIT_FOR_RF, -3120);
 #undef E
 }; // namespace ErrorCode
 
@@ -367,6 +368,7 @@ public:
     ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR)
     ERROR_CTOR(EndOfFile, END_OF_FILE)
     ERROR_CTOR(InternalError, INTERNAL_ERROR)
+    ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF)
     ERROR_CTOR(RuntimeError, RUNTIME_ERROR)
     ERROR_CTOR(Cancelled, CANCELLED)
     ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED)
@@ -386,6 +388,8 @@ public:
 
     bool ok() const { return _code == ErrorCode::OK; }
 
+    bool is_blocked_by_rf() const { return _code == ErrorCode::PIP_WAIT_FOR_RF; }
+
     bool is_io_error() const {
         return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH == _code ||
                ErrorCode::CHECKSUM_ERROR == _code || ErrorCode::FILE_DATA_ERROR == _code ||
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index d584f06d17..a2d7c6118c 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -36,7 +36,6 @@
 #include "runtime/large_int_value.h"
 #include "runtime/primitive_type.h"
 #include "runtime/runtime_filter_mgr.h"
-#include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/string_parser.hpp"
 #include "vec/columns/column.h"
@@ -1205,7 +1204,7 @@ Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>* push_expr
     if (_is_ignored) {
         return Status::OK();
     }
-    DCHECK(_is_ready);
+    DCHECK(!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY);
     DCHECK(is_consumer());
     std::lock_guard<std::mutex> guard(_inner_mutex);
 
@@ -1225,7 +1224,9 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
     if (_is_ignored) {
         return Status::OK();
     }
-    DCHECK(_is_ready);
+    DCHECK((!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY) ||
+           (_state->enable_pipeline_exec() &&
+            _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY));
     DCHECK(is_consumer());
     std::lock_guard<std::mutex> guard(_inner_mutex);
 
@@ -1239,29 +1240,95 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
 
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
-    SCOPED_TIMER(_await_time_cost);
     // bitmap filter is precise filter and only filter once, so it must be applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
                                     ? _state->query_options().query_timeout
                                     : _state->runtime_filter_wait_time_ms();
-    std::unique_lock<std::mutex> lock(_inner_mutex);
-    if (!_is_ready) {
-        int64_t ms_since_registration = MonotonicMillis() - registration_time_;
-        int64_t ms_remaining = wait_times_ms - ms_since_registration;
-        if (ms_remaining <= 0) {
-            return _is_ready;
+    if (_state->enable_pipeline_exec()) {
+        auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+        if (expected == RuntimeFilterState::NOT_READY) {
+            if (!_rf_state_atomic.compare_exchange_strong(
+                        expected,
+                        MonotonicMillis() - registration_time_ >= wait_times_ms
+                                ? RuntimeFilterState::TIME_OUT
+                                : RuntimeFilterState::NOT_READY,
+                        std::memory_order_acq_rel)) {
+                DCHECK(expected == RuntimeFilterState::READY);
+                return true;
+            }
+            return false;
+        } else if (expected == RuntimeFilterState::TIME_OUT) {
+            return false;
+        }
+    } else {
+        SCOPED_TIMER(_await_time_cost);
+        std::unique_lock<std::mutex> lock(_inner_mutex);
+        if (_rf_state != RuntimeFilterState::READY) {
+            int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+            int64_t ms_remaining = wait_times_ms - ms_since_registration;
+            _rf_state = RuntimeFilterState::TIME_OUT;
+            if (ms_remaining <= 0) {
+                return false;
+            }
+            return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
+                                      [this] { return _rf_state == RuntimeFilterState::READY; });
         }
-        return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
-                                  [this] { return this->_is_ready; });
     }
     return true;
 }
 
+bool IRuntimeFilter::is_ready_or_timeout() {
+    DCHECK(is_consumer());
+    auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
+    // bitmap filter is precise filter and only filter once, so it must be applied.
+    int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+                                    ? _state->query_options().query_timeout
+                                    : _state->runtime_filter_wait_time_ms();
+    int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+    if (!_state->enable_pipeline_exec()) {
+        _rf_state = RuntimeFilterState::TIME_OUT;
+        return true;
+    } else if (is_ready()) {
+        if (cur_state == RuntimeFilterState::NOT_READY) {
+            _profile->add_info_string("EffectTime", std::to_string(ms_since_registration) + " ms");
+        }
+        return true;
+    } else {
+        if (cur_state == RuntimeFilterState::NOT_READY) {
+            _profile->add_info_string("EffectTime", std::to_string(ms_since_registration) + " ms");
+        }
+        if (is_ready()) {
+            return true;
+        }
+        bool timeout = wait_times_ms <= ms_since_registration;
+        auto expected = RuntimeFilterState::NOT_READY;
+        if (timeout) {
+            if (!_rf_state_atomic.compare_exchange_strong(expected, RuntimeFilterState::TIME_OUT,
+                                                          std::memory_order_acq_rel)) {
+                DCHECK(expected == RuntimeFilterState::READY ||
+                       expected == RuntimeFilterState::TIME_OUT);
+                return true;
+            }
+            return true;
+        }
+        if (!_rf_state_atomic.compare_exchange_strong(expected, RuntimeFilterState::NOT_READY,
+                                                      std::memory_order_acq_rel)) {
+            DCHECK(expected == RuntimeFilterState::READY);
+            return true;
+        }
+        return false;
+    }
+}
+
 void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
-    std::unique_lock<std::mutex> lock(_inner_mutex);
-    _is_ready = true;
-    _inner_cv.notify_all();
+    if (_state->enable_pipeline_exec()) {
+        _rf_state_atomic.store(RuntimeFilterState::READY);
+    } else {
+        std::unique_lock<std::mutex> lock(_inner_mutex);
+        _rf_state = RuntimeFilterState::READY;
+        _inner_cv.notify_all();
+    }
 
     if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) {
         _profile->add_info_string("InFilterSize", std::to_string(_wrapper->get_in_filter_size()));
@@ -1412,7 +1479,9 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
     _profile.reset(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
                                                   ::doris::to_string(_runtime_filter_type))));
     parent_profile->add_child(_profile.get(), true, nullptr);
-    _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
+    if (!_state->enable_pipeline_exec()) {
+        _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
+    }
     _profile->add_info_string("Info", _format_status());
     if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
         update_runtime_filter_type_to_profile();
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 5cd08aca46..e1a4b236e3 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "exprs/expr_context.h"
+#include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/time.h"
 #include "util/uid_util.h"
@@ -120,6 +121,12 @@ struct MergeRuntimeFilterParams {
     butil::IOBufAsZeroCopyInputStream* data;
 };
 
+enum RuntimeFilterState {
+    READY,
+    NOT_READY,
+    TIME_OUT,
+};
+
 /// The runtimefilter is built in the join node.
 /// The main purpose is to reduce the scanning amount of the
 /// left table data according to the scanning results of the right table during the join process.
@@ -135,7 +142,8 @@ public:
               _is_broadcast_join(true),
               _has_remote_target(false),
               _has_local_target(false),
-              _is_ready(false),
+              _rf_state(RuntimeFilterState::NOT_READY),
+              _rf_state_atomic(RuntimeFilterState::NOT_READY),
               _role(RuntimeFilterRole::PRODUCER),
               _expr_order(-1),
               _always_true(false),
@@ -188,7 +196,16 @@ public:
 
     bool has_remote_target() const { return _has_remote_target; }
 
-    bool is_ready() const { return _is_ready; }
+    bool is_ready() const {
+        return (!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY) ||
+               (_state->enable_pipeline_exec() &&
+                _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY);
+    }
+    RuntimeFilterState current_state() const {
+        return _state->enable_pipeline_exec() ? _rf_state_atomic.load(std::memory_order_acquire)
+                                              : _rf_state;
+    }
+    bool is_ready_or_timeout();
 
     bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; }
     bool is_consumer() const { return _role == RuntimeFilterRole::CONSUMER; }
@@ -277,9 +294,25 @@ protected:
 
     std::string _format_status() {
         return fmt::format(
-                "[IsPushDown = {}, IsEffective = {}, IsIgnored = {}, HasRemoteTarget = {}, "
+                "[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, HasRemoteTarget = {}, "
                 "HasLocalTarget = {}]",
-                _is_push_down, _is_ready, _is_ignored, _has_remote_target, _has_local_target);
+                _is_push_down, _get_explain_state_string(), _is_ignored, _has_remote_target,
+                _has_local_target);
+    }
+
+    std::string _get_explain_state_string() {
+        if (_state->enable_pipeline_exec()) {
+            return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY
+                           ? "READY"
+                   : _rf_state_atomic.load(std::memory_order_acquire) ==
+                                   RuntimeFilterState::TIME_OUT
+                           ? "TIME_OUT"
+                           : "NOT_READY";
+        } else {
+            return _rf_state == RuntimeFilterState::READY      ? "READY"
+                   : _rf_state == RuntimeFilterState::TIME_OUT ? "TIME_OUT"
+                                                               : "NOT_READY";
+        }
     }
 
     RuntimeState* _state;
@@ -298,7 +331,8 @@ protected:
     // will apply to local node
     bool _has_local_target;
     // filter is ready for consumer
-    bool _is_ready;
+    RuntimeFilterState _rf_state;
+    std::atomic<RuntimeFilterState> _rf_state_atomic;
     // role consumer or producer
     RuntimeFilterRole _role;
     // expr index
diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp
index ed147db2be..faaee410d0 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -58,6 +58,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
     pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
 
     _rpc_context->request.set_filter_id(_filter_id);
+    _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
     _rpc_context->cntl.set_timeout_ms(1000);
     _rpc_context->cid = _rpc_context->cntl.call_id();
 
diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp
index 770937c8bd..3b78b59e7c 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -50,19 +50,4 @@ std::string OperatorBase::debug_string() const {
     return ss.str();
 }
 
-/////////////////////////////////////// OperatorBuilder ////////////////////////////////////////////////////////////
-
-Status OperatorBuilderBase::prepare(doris::RuntimeState* state) {
-    _state = state;
-    // runtime filter, now dispose by NewOlapScanNode
-    return Status::OK();
-}
-
-void OperatorBuilderBase::close(doris::RuntimeState* state) {
-    if (_is_closed) {
-        return;
-    }
-    _is_closed = true;
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 8196c8e1dc..4679832bb1 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -87,10 +87,6 @@ public:
     virtual bool is_sink() const { return false; }
     virtual bool is_source() const { return false; }
 
-    virtual Status prepare(RuntimeState* state);
-
-    virtual void close(RuntimeState* state);
-
     std::string get_name() const { return _name; }
 
     RuntimeState* runtime_state() { return _state; }
@@ -184,6 +180,8 @@ public:
 
     virtual bool can_read() { return false; } // for source
 
+    virtual bool runtime_filters_are_ready_or_timeout() { return true; } // for source
+
     virtual bool can_write() { return false; } // for sink
 
     /**
diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp
index f28c7284fe..f9f2125878 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -24,22 +24,14 @@ namespace doris::pipeline {
 
 OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
 
-Status ScanOperator::open(RuntimeState* state) {
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(SourceOperator::open(state));
-    return _node->open(state);
-}
-
 bool ScanOperator::can_read() {
-    if (_node->_eos || !_node->_scanner_ctx || _node->_scanner_ctx->done() ||
-        _node->_scanner_ctx->can_finish()) {
+    if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->can_finish()) {
         // _eos: need eos
-        // !_scanner_ctx: need call open
         // _scanner_ctx->done(): need finish
         // _scanner_ctx->can_finish(): should be scheduled
         return true;
     } else {
-        return !_node->_scanner_ctx->empty_in_queue(); // have block to process
+        return !_node->_scanner_ctx->empty_in_queue(); // there are some blocks to process
     }
 }
 
@@ -47,10 +39,8 @@ bool ScanOperator::is_pending_finish() const {
     return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish();
 }
 
-Status ScanOperator::close(RuntimeState* state) {
-    RETURN_IF_ERROR(SourceOperator::close(state));
-    _node->close(state);
-    return Status::OK();
+bool ScanOperator::runtime_filters_are_ready_or_timeout() {
+    return _node->runtime_filters_are_ready_or_timeout();
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h
index 964625fa7d..f71a04af8a 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -42,9 +42,7 @@ public:
 
     bool is_pending_finish() const override;
 
-    Status open(RuntimeState* state) override;
-
-    Status close(RuntimeState* state) override;
+    bool runtime_filters_are_ready_or_timeout() override;
 };
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 7c956a5e9d..c5a94ba640 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -21,16 +21,11 @@
 
 namespace doris::pipeline {
 
-Status Pipeline::prepare(RuntimeState* state) {
+void Pipeline::_init_profile() {
     std::stringstream ss;
     ss << "Pipeline"
        << " (pipeline id=" << _pipeline_id << ")";
     _pipeline_profile.reset(new RuntimeProfile(ss.str()));
-    for (auto& op : _operator_builders) {
-        RETURN_IF_ERROR(op->prepare(state));
-    }
-    RETURN_IF_ERROR(_sink->prepare(state));
-    return Status::OK();
 }
 
 Status Pipeline::build_operators(Operators& operators) {
@@ -46,13 +41,6 @@ Status Pipeline::build_operators(Operators& operators) {
     return Status::OK();
 }
 
-void Pipeline::close(RuntimeState* state) {
-    for (auto& op : _operator_builders) {
-        op->close(state);
-    }
-    _sink->close(state);
-}
-
 Status Pipeline::add_operator(OperatorBuilderPtr& op) {
     if (_operator_builders.empty() && !op->is_source()) {
         return Status::InternalError("Should set source before other operator");
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 595f16ba63..802c169d96 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -37,21 +37,17 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
 public:
     Pipeline() = delete;
     explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context)
-            : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {}
-
-    Status prepare(RuntimeState* state);
-
-    void close(RuntimeState*);
+            : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {
+        _init_profile();
+    }
 
     void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
         pipeline->_parents.push_back(shared_from_this());
         _dependencies.push_back(pipeline);
     }
 
-    // If all dependency be finished, the pipeline task shoule be scheduled
-    // pipeline is finish must call the parents `finish_one_dependency`
-    // like the condition variables.
-    // Eg: hash build finish must call the hash probe the method
+    // If all dependencies are finished, this pipeline task should be scheduled.
+    // e.g. Hash join probe task will be scheduled once Hash join build task is finished.
     bool finish_one_dependency() {
         DCHECK(_complete_dependency < _dependencies.size());
         return _complete_dependency.fetch_add(1) == _dependencies.size() - 1;
@@ -70,6 +66,7 @@ public:
     RuntimeProfile* runtime_profile() { return _pipeline_profile.get(); }
 
 private:
+    void _init_profile();
     std::atomic<uint32_t> _complete_dependency;
 
     OperatorBuilders _operator_builders; // left is _source, right is _root
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index ecb79bb2d0..d87afb8321 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -272,10 +272,6 @@ Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& re
 
 Status PipelineFragmentContext::_build_pipeline_tasks(
         const doris::TExecPlanFragmentParams& request) {
-    for (auto& pipeline : _pipelines) {
-        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
-    }
-
     for (PipelinePtr& pipeline : _pipelines) {
         // if sink
         auto sink = pipeline->sink()->build_operator();
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index 5302cc83b5..5bc58145c0 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -77,6 +77,11 @@ public:
 
     std::string to_http_path(const std::string& file_name);
 
+    void set_merge_controller_handler(
+            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
+        _merge_controller_handler = handler;
+    }
+
     void send_report(bool);
 
 private:
@@ -118,6 +123,7 @@ private:
 
     // If set the true, this plan fragment will be executed only after FE send execution start rpc.
     bool _need_wait_execution_trigger = false;
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
 
     MonotonicStopWatch _fragment_watcher;
     //    RuntimeProfile::Counter* _start_timer;
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 250dfc2bab..96fbd40f90 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -47,23 +47,13 @@ Status PipelineTask::prepare(RuntimeState* state) {
         RETURN_IF_ERROR(o->prepare(state));
     }
     _block.reset(new doris::vectorized::Block());
-    _init_state();
+
+    // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters).
+    set_state(RUNNABLE);
     _prepared = true;
     return Status::OK();
 }
 
-void PipelineTask::_init_state() {
-    if (has_dependency()) {
-        set_state(BLOCKED_FOR_DEPENDENCY);
-    } else if (!(_source->can_read())) {
-        set_state(BLOCKED_FOR_SOURCE);
-    } else if (!(_sink->can_write())) {
-        set_state(BLOCKED_FOR_SINK);
-    } else {
-        set_state(RUNNABLE);
-    }
-}
-
 bool PipelineTask::has_dependency() {
     if (_dependency_finish) {
         return false;
@@ -86,12 +76,12 @@ bool PipelineTask::has_dependency() {
 }
 
 Status PipelineTask::open() {
-    if (_sink) {
-        RETURN_IF_ERROR(_sink->open(_state));
-    }
     for (auto& o : _operators) {
         RETURN_IF_ERROR(o->open(_state));
     }
+    if (_sink) {
+        RETURN_IF_ERROR(_sink->open(_state));
+    }
     _opened = true;
     return Status::OK();
 }
@@ -103,6 +93,19 @@ Status PipelineTask::execute(bool* eos) {
     // The status must be runnable
     *eos = false;
     if (!_opened) {
+        {
+            SCOPED_RAW_TIMER(&time_spent);
+            auto st = open();
+            if (st.is_blocked_by_rf()) {
+                set_state(BLOCKED_FOR_RF);
+                return Status::OK();
+            }
+            RETURN_IF_ERROR(st);
+        }
+        if (has_dependency()) {
+            set_state(BLOCKED_FOR_DEPENDENCY);
+            return Status::OK();
+        }
         if (!_source->can_read()) {
             set_state(BLOCKED_FOR_SOURCE);
             return Status::OK();
@@ -111,8 +114,6 @@ Status PipelineTask::execute(bool* eos) {
             set_state(BLOCKED_FOR_SINK);
             return Status::OK();
         }
-        SCOPED_RAW_TIMER(&time_spent);
-        RETURN_IF_ERROR(open());
     }
 
     while (!_fragment_context->is_canceled()) {
@@ -168,7 +169,6 @@ Status PipelineTask::close() {
         COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time());
         COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time());
     }
-    _pipeline->close(_state);
     return s;
 }
 
@@ -204,14 +204,14 @@ void PipelineTask::set_state(PipelineTaskState state) {
 }
 
 std::string PipelineTask::debug_string() const {
-    std::stringstream ss;
-    ss << "PipelineTask(" << _index << ")" << get_state_name(_cur_state) << "\nsink: ";
-    ss << _sink->debug_string();
-    ss << "\n operators(from source to root)";
-    for (auto operatr : _operators) {
-        ss << "\n" << operatr->debug_string();
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index,
+                   get_state_name(_cur_state));
+    for (size_t i = 0; i < _operators.size(); i++) {
+        fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
+                       _operators[i]->debug_string());
     }
-    return ss.str();
+    return fmt::to_string(debug_string_buffer);
 }
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 34cce073e6..213aaf2507 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -57,7 +57,8 @@ enum PipelineTaskState : uint8_t {
     PENDING_FINISH =
             5, // compute task is over, but still hold resource. like some scan and sink task
     FINISHED = 6,
-    CANCELED = 7
+    CANCELED = 7,
+    BLOCKED_FOR_RF = 8,
 };
 
 inline const char* get_state_name(PipelineTaskState idx) {
@@ -78,6 +79,8 @@ inline const char* get_state_name(PipelineTaskState idx) {
         return "FINISHED";
     case PipelineTaskState::CANCELED:
         return "CANCELED";
+    case PipelineTaskState::BLOCKED_FOR_RF:
+        return "BLOCKED_FOR_RF";
     }
     __builtin_unreachable();
 }
@@ -117,21 +120,15 @@ public:
 
     PipelineTaskState get_state() { return _cur_state; }
     void set_state(PipelineTaskState state);
-    bool is_blocking_state() {
-        switch (_cur_state) {
-        case BLOCKED_FOR_DEPENDENCY:
-        case BLOCKED_FOR_SOURCE:
-        case BLOCKED_FOR_SINK:
-            return true;
-        default:
-            return false;
-        }
-    }
 
     bool is_pending_finish() { return _source->is_pending_finish() || _sink->is_pending_finish(); }
 
     bool source_can_read() { return _source->can_read(); }
 
+    bool runtime_filters_are_ready_or_timeout() {
+        return _source->runtime_filters_are_ready_or_timeout();
+    }
+
     bool sink_can_write() { return _sink->can_write(); }
 
     Status finalize();
@@ -165,7 +162,6 @@ public:
 private:
     Status open();
     void _init_profile();
-    void _init_state();
 
     uint32_t _index;
     PipelinePtr _pipeline;
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 18e5ce2449..c74be19f0d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -120,6 +120,12 @@ void BlockedTaskScheduler::_schedule() {
                 } else {
                     iter++;
                 }
+            } else if (state == BLOCKED_FOR_RF) {
+                if (task->runtime_filters_are_ready_or_timeout()) {
+                    _make_task_run(local_blocked_tasks, iter, ready_tasks);
+                } else {
+                    iter++;
+                }
             } else if (state == BLOCKED_FOR_SINK) {
                 if (task->sink_can_write()) {
                     _make_task_run(local_blocked_tasks, iter, ready_tasks);
@@ -193,11 +199,7 @@ Status TaskScheduler::start() {
 }
 
 Status TaskScheduler::schedule_task(PipelineTask* task) {
-    if (task->is_blocking_state()) {
-        _blocked_task_scheduler->add_blocked_task(task);
-    } else {
-        _task_queue->push_back(task);
-    }
+    _task_queue->push_back(task);
     // TODO control num of task
     return Status::OK();
 }
@@ -225,8 +227,7 @@ void TaskScheduler::_do_work(size_t index) {
 
         auto check_state = task->get_state();
         if (check_state == PENDING_FINISH) {
-            bool is_pending = task->is_pending_finish();
-            DCHECK(!is_pending) << "must not pending close " << task->debug_string();
+            DCHECK(!task->is_pending_finish()) << "must not pending close " << task->debug_string();
             _try_close_task(task, canceled ? CANCELED : FINISHED);
             continue;
         }
@@ -245,7 +246,8 @@ void TaskScheduler::_do_work(size_t index) {
         auto status = task->execute(&eos);
         task->set_previous_core_id(index);
         if (!status.ok()) {
-            LOG(WARNING) << "Pipeline taks execute task fail " << task->debug_string();
+            LOG(WARNING) << "Pipeline task execute task fail " << task->debug_string() << " "
+                         << status.to_string();
             // exec failed,cancel all fragment instance
             fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "execute fail");
             _try_close_task(task, CANCELED);
@@ -272,6 +274,8 @@ void TaskScheduler::_do_work(size_t index) {
         switch (pipeline_state) {
         case BLOCKED_FOR_SOURCE:
         case BLOCKED_FOR_SINK:
+        case BLOCKED_FOR_RF:
+        case BLOCKED_FOR_DEPENDENCY:
             _blocked_task_scheduler->add_blocked_task(task);
             break;
         case RUNNABLE:
@@ -295,6 +299,11 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state)
         if (!status.ok()) {
             // TODO: LOG warning
         }
+        if (task->is_pending_finish()) {
+            task->set_state(PENDING_FINISH);
+            _blocked_task_scheduler->add_blocked_task(task);
+            return;
+        }
         task->set_state(state);
         // TODO: rethink the logic
         if (state == CANCELED) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b0d00fb99f..0037ec255a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -714,7 +714,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
             RETURN_IF_ERROR(exec_state->prepare(params));
         }
         g_fragmentmgr_prepare_latency << (duration_ns / 1000);
-        // TODO: Support runtime filter on pipeline engine
         std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
         _runtimefilter_controller.add_entity(params, &handler,
                                              exec_state->executor()->runtime_state());
@@ -757,6 +756,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
             g_fragmentmgr_prepare_latency << (duration_ns / 1000);
         }
 
+        std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
+        _runtimefilter_controller.add_entity(params, &handler, context->get_runtime_state());
+        context->set_merge_controller_handler(handler);
+
         {
             std::lock_guard<std::mutex> lock(_lock);
             _pipeline_map.insert(std::make_pair(fragment_instance_id, context));
@@ -1028,12 +1031,34 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
 
 Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
                                  butil::IOBufAsZeroCopyInputStream* attach_data) {
-    // TODO pipeline runtime filter
+    bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+
     UniqueId fragment_instance_id = request->fragment_id();
     TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
+
     std::shared_ptr<FragmentExecState> fragment_state;
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
 
-    {
+    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+    if (is_pipeline) {
+        std::unique_lock<std::mutex> lock(_lock);
+
+        if (!_pipeline_map.count(tfragment_instance_id)) {
+            VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << fragment_instance_id;
+            _cv.wait_for(lock, std::chrono::milliseconds(1000),
+                         [&] { return _pipeline_map.count(tfragment_instance_id); });
+        }
+
+        auto iter = _pipeline_map.find(tfragment_instance_id);
+        if (iter == _pipeline_map.end()) {
+            VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
+            return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
+        }
+        pip_context = iter->second;
+
+        DCHECK(pip_context != nullptr);
+        runtime_filter_mgr = pip_context->get_runtime_state()->runtime_filter_mgr();
+    } else {
         std::unique_lock<std::mutex> lock(_lock);
         if (!_fragment_map.count(tfragment_instance_id)) {
             VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << fragment_instance_id;
@@ -1047,26 +1072,37 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
             return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
         }
         fragment_state = iter->second;
-    }
 
-    DCHECK(fragment_state != nullptr);
-    RuntimeFilterMgr* runtime_filter_mgr =
-            fragment_state->executor()->runtime_state()->runtime_filter_mgr();
+        DCHECK(fragment_state != nullptr);
+        runtime_filter_mgr = fragment_state->executor()->runtime_state()->runtime_filter_mgr();
+    }
 
-    Status st = runtime_filter_mgr->update_filter(request, attach_data);
-    return st;
+    return runtime_filter_mgr->update_filter(request, attach_data);
 }
 
 Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
                                  butil::IOBufAsZeroCopyInputStream* attach_data) {
     UniqueId queryid = request->query_id();
+    bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
 
     auto fragment_instance_id = filter_controller->instance_id();
     TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
     std::shared_ptr<FragmentExecState> fragment_state;
-    {
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+    if (is_pipeline) {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _pipeline_map.find(tfragment_instance_id);
+        if (iter == _pipeline_map.end()) {
+            VLOG_CRITICAL << "unknown fragment-id:" << fragment_instance_id;
+            return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
+        }
+
+        // hold reference to pip_context, or else runtime_state can be destroyed
+        // when filter_controller->merge is still in progress
+        pip_context = iter->second;
+    } else {
         std::unique_lock<std::mutex> lock(_lock);
         auto iter = _fragment_map.find(tfragment_instance_id);
         if (iter == _fragment_map.end()) {
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index ce851c119a..bf0bfd0d9d 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -238,6 +238,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
             size_t cur = rpc_contexts.size() - 1;
             rpc_contexts[cur]->request = apply_request;
             rpc_contexts[cur]->request.set_filter_id(request->filter_id());
+            rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
+                                                       request->is_pipeline());
             *rpc_contexts[cur]->request.mutable_query_id() = request->query_id();
             if (has_attachment) {
                 rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 5af2f7df55..f01156e893 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -323,7 +323,10 @@ public:
         }
         return _query_options.be_exec_version;
     }
-    bool enable_pipeline_exec() const { return _query_options.enable_pipeline_engine; }
+    bool enable_pipeline_exec() const {
+        return _query_options.__isset.enable_pipeline_engine &&
+               _query_options.enable_pipeline_engine;
+    }
 
     bool trim_tailing_spaces_for_external_table_query() const {
         return _query_options.trim_tailing_spaces_for_external_table_query;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 9db08c937c..cf017da7c6 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -84,13 +84,22 @@ Status VScanNode::prepare(RuntimeState* state) {
 
 Status VScanNode::open(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
-    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(ExecNode::open(state));
+    return ExecNode::open(state);
+}
 
+Status VScanNode::alloc_resource(RuntimeState* state) {
+    if (_opened) {
+        return Status::OK();
+    }
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
+    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::alloc_resource");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::alloc_resource(state));
     RETURN_IF_ERROR(_acquire_runtime_filter());
     RETURN_IF_ERROR(_process_conjuncts());
 
@@ -102,6 +111,8 @@ Status VScanNode::open(RuntimeState* state) {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
         RETURN_IF_ERROR(_start_scanners(scanners));
     }
+    RETURN_IF_CANCELLED(state);
+    _opened = true;
     return Status::OK();
 }
 
@@ -213,6 +224,20 @@ Status VScanNode::_register_runtime_filter() {
     return Status::OK();
 }
 
+bool VScanNode::runtime_filters_are_ready_or_timeout() {
+    if (!_blocked_by_rf) {
+        return true;
+    }
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
+        if (!runtime_filter->is_ready_or_timeout()) {
+            return false;
+        }
+    }
+    _blocked_by_rf = false;
+    return true;
+}
+
 Status VScanNode::_acquire_runtime_filter(bool wait) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<VExpr*> vexprs;
@@ -229,14 +254,23 @@ Status VScanNode::_acquire_runtime_filter(bool wait) {
         if (!ready && wait) {
             ready = runtime_filter->await();
         }
-        if (ready) {
+        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
             RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
             _runtime_filter_ctxs[i].apply_mark = true;
-        } else {
+        } else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
+                   runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
+                   !_runtime_filter_ctxs[i].apply_mark) {
+            _blocked_by_rf = true;
+        } else if (!_runtime_filter_ctxs[i].apply_mark) {
+            DCHECK(!_blocked_by_rf &&
+                   runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
             _is_all_rf_applied = false;
         }
     }
     RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
+    if (_blocked_by_rf) {
+        return Status::WaitForRf("Runtime filters are neither not ready nor timeout");
+    }
 
     return Status::OK();
 }
@@ -304,6 +338,12 @@ Status VScanNode::close(RuntimeState* state) {
         return Status::OK();
     }
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::close");
+    RETURN_IF_ERROR(ExecNode::close(state));
+    return Status::OK();
+}
+
+void VScanNode::release_resource(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::release_resource");
     if (_scanner_ctx.get()) {
         // stop and wait the scanner scheduler to be done
         // _scanner_ctx may not be created for some short circuit case.
@@ -321,8 +361,7 @@ Status VScanNode::close(RuntimeState* state) {
     }
     _scanner_pool.clear();
 
-    RETURN_IF_ERROR(ExecNode::close(state));
-    return Status::OK();
+    ExecNode::release_resource(state);
 }
 
 Status VScanNode::_normalize_conjuncts() {
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 3e6132bc29..2f6e5c6b4a 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -91,6 +91,10 @@ public:
     const TupleDescriptor* input_tuple_desc() const { return _input_tuple_desc; }
     const TupleDescriptor* output_tuple_desc() const { return _output_tuple_desc; }
 
+    Status alloc_resource(RuntimeState* state) override;
+    void release_resource(RuntimeState* state) override;
+    bool runtime_filters_are_ready_or_timeout();
+
     enum class PushDownType {
         // The predicate can not be pushed down to data source
         UNACCEPTABLE,
@@ -202,6 +206,7 @@ protected:
 
     // indicate this scan node has no more data to return
     bool _eos = false;
+    bool _opened = false;
 
     FilterPredicates _filter_predicates {};
 
@@ -223,6 +228,7 @@ protected:
     std::vector<ColumnValueRangeType> _not_in_value_ranges;
 
     bool _need_agg_finalize = true;
+    bool _blocked_by_rf = false;
 
     // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector
     // so that it will be destroyed uniformly at the end of the query.
diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp
index ba73754404..8e34b1e444 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -31,6 +31,7 @@ VExchangeNode::VExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const Des
         : ExecNode(pool, tnode, descs),
           _num_senders(0),
           _is_merging(tnode.exchange_node.__isset.sort_info),
+          _is_ready(false),
           _stream_recvr(nullptr),
           _input_row_desc(descs, tnode.exchange_node.input_row_tuples,
                           std::vector<bool>(tnode.nullable_tuples.begin(),
@@ -70,9 +71,11 @@ Status VExchangeNode::alloc_resource(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::alloc_resource(state));
     if (_is_merging) {
         RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
-        RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
-                                                     _is_asc_order, _nulls_first,
-                                                     state->batch_size(), _limit, _offset));
+        if (!state->enable_pipeline_exec()) {
+            RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
+                                                         _is_asc_order, _nulls_first,
+                                                         state->batch_size(), _limit, _offset));
+        }
     }
     return Status::OK();
 }
@@ -93,6 +96,13 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next");
     SCOPED_TIMER(runtime_profile()->total_time_counter());
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+    if (_is_merging && state->enable_pipeline_exec() && !_is_ready) {
+        RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
+                                                     _is_asc_order, _nulls_first,
+                                                     state->batch_size(), _limit, _offset));
+        _is_ready = true;
+        return Status::OK();
+    }
     auto status = _stream_recvr->get_next(block, eos);
     if (block != nullptr) {
         if (_num_rows_returned + block->rows() < _limit) {
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index dc633cafd0..1d767c8cb2 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -51,6 +51,7 @@ public:
 private:
     int _num_senders;
     bool _is_merging;
+    bool _is_ready;
     std::shared_ptr<VDataStreamRecvr> _stream_recvr;
     RowDescriptor _input_row_desc;
     std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h
index 386b8ef41f..e8b70bbeca 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -217,7 +217,9 @@ public:
     void _update_block_queue_empty() override { _block_queue_empty = _block_queue.empty(); }
 
     Status get_batch(Block** next_block) override {
-        CHECK(!should_wait());
+        CHECK(!should_wait()) << " _is_cancelled: " << _is_cancelled
+                              << ", _block_queue_empty: " << _block_queue_empty
+                              << ", _num_remaining_senders: " << _num_remaining_senders;
         std::lock_guard<std::mutex> l(_lock); // protect _block_queue
         return _inner_get_batch(next_block);
     }
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index bf61b22707..ff05509e67 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -438,6 +438,7 @@ message PMergeFilterRequest {
     optional PMinMaxFilter minmax_filter = 5;
     optional PBloomFilter bloom_filter = 6;
     optional PInFilter in_filter = 7;
+    optional bool is_pipeline = 8;
 };
 
 message PMergeFilterResponse {
@@ -452,6 +453,7 @@ message PPublishFilterRequest {
     optional PMinMaxFilter minmax_filter = 5;
     optional PBloomFilter bloom_filter = 6;
     optional PInFilter in_filter = 7;
+    optional bool is_pipeline = 8;
 };
 
 message PPublishFilterResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org