You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/18 13:48:06 UTC
[doris] branch master updated: [Pipeline](runtime filter) Support runtime filters on pipeline engine (#15040)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 13bc8c2ef8 [Pipeline](runtime filter) Support runtime filters on pipeline engine (#15040)
13bc8c2ef8 is described below
commit 13bc8c2ef89fe598adf329a3c1a52c7569f81036
Author: Gabriel <ga...@gmail.com>
AuthorDate: Sun Dec 18 21:48:00 2022 +0800
[Pipeline](runtime filter) Support runtime filters on pipeline engine (#15040)
---
be/src/common/status.h | 4 +
be/src/exprs/runtime_filter.cpp | 101 ++++++++++++++++++++++----
be/src/exprs/runtime_filter.h | 44 +++++++++--
be/src/exprs/runtime_filter_rpc.cpp | 1 +
be/src/pipeline/exec/operator.cpp | 15 ----
be/src/pipeline/exec/operator.h | 6 +-
be/src/pipeline/exec/scan_operator.cpp | 18 +----
be/src/pipeline/exec/scan_operator.h | 4 +-
be/src/pipeline/pipeline.cpp | 14 +---
be/src/pipeline/pipeline.h | 15 ++--
be/src/pipeline/pipeline_fragment_context.cpp | 4 -
be/src/pipeline/pipeline_fragment_context.h | 6 ++
be/src/pipeline/pipeline_task.cpp | 52 ++++++-------
be/src/pipeline/pipeline_task.h | 20 ++---
be/src/pipeline/task_scheduler.cpp | 25 +++++--
be/src/runtime/fragment_mgr.cpp | 56 +++++++++++---
be/src/runtime/runtime_filter_mgr.cpp | 2 +
be/src/runtime/runtime_state.h | 5 +-
be/src/vec/exec/scan/vscan_node.cpp | 53 ++++++++++++--
be/src/vec/exec/scan/vscan_node.h | 6 ++
be/src/vec/exec/vexchange_node.cpp | 16 +++-
be/src/vec/exec/vexchange_node.h | 1 +
be/src/vec/runtime/vdata_stream_recvr.h | 4 +-
gensrc/proto/internal_service.proto | 2 +
24 files changed, 323 insertions(+), 151 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 0cfc4517be..0970447a45 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -246,6 +246,7 @@ E(ROWSET_RENAME_FILE_FAILED, -3116);
E(SEGCOMPACTION_INIT_READER, -3117);
E(SEGCOMPACTION_INIT_WRITER, -3118);
E(SEGCOMPACTION_FAILED, -3119);
+E(PIP_WAIT_FOR_RF, -3120);
#undef E
}; // namespace ErrorCode
@@ -367,6 +368,7 @@ public:
ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR)
ERROR_CTOR(EndOfFile, END_OF_FILE)
ERROR_CTOR(InternalError, INTERNAL_ERROR)
+ ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF)
ERROR_CTOR(RuntimeError, RUNTIME_ERROR)
ERROR_CTOR(Cancelled, CANCELLED)
ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED)
@@ -386,6 +388,8 @@ public:
bool ok() const { return _code == ErrorCode::OK; }
+ bool is_blocked_by_rf() const { return _code == ErrorCode::PIP_WAIT_FOR_RF; }
+
bool is_io_error() const {
return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH == _code ||
ErrorCode::CHECKSUM_ERROR == _code || ErrorCode::FILE_DATA_ERROR == _code ||
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index d584f06d17..a2d7c6118c 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -36,7 +36,6 @@
#include "runtime/large_int_value.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_filter_mgr.h"
-#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/string_parser.hpp"
#include "vec/columns/column.h"
@@ -1205,7 +1204,7 @@ Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>* push_expr
if (_is_ignored) {
return Status::OK();
}
- DCHECK(_is_ready);
+ DCHECK(!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY);
DCHECK(is_consumer());
std::lock_guard<std::mutex> guard(_inner_mutex);
@@ -1225,7 +1224,9 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
if (_is_ignored) {
return Status::OK();
}
- DCHECK(_is_ready);
+ DCHECK((!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY) ||
+ (_state->enable_pipeline_exec() &&
+ _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY));
DCHECK(is_consumer());
std::lock_guard<std::mutex> guard(_inner_mutex);
@@ -1239,29 +1240,95 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
bool IRuntimeFilter::await() {
DCHECK(is_consumer());
- SCOPED_TIMER(_await_time_cost);
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
? _state->query_options().query_timeout
: _state->runtime_filter_wait_time_ms();
- std::unique_lock<std::mutex> lock(_inner_mutex);
- if (!_is_ready) {
- int64_t ms_since_registration = MonotonicMillis() - registration_time_;
- int64_t ms_remaining = wait_times_ms - ms_since_registration;
- if (ms_remaining <= 0) {
- return _is_ready;
+ if (_state->enable_pipeline_exec()) {
+ auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+ if (expected == RuntimeFilterState::NOT_READY) {
+ if (!_rf_state_atomic.compare_exchange_strong(
+ expected,
+ MonotonicMillis() - registration_time_ >= wait_times_ms
+ ? RuntimeFilterState::TIME_OUT
+ : RuntimeFilterState::NOT_READY,
+ std::memory_order_acq_rel)) {
+ DCHECK(expected == RuntimeFilterState::READY);
+ return true;
+ }
+ return false;
+ } else if (expected == RuntimeFilterState::TIME_OUT) {
+ return false;
+ }
+ } else {
+ SCOPED_TIMER(_await_time_cost);
+ std::unique_lock<std::mutex> lock(_inner_mutex);
+ if (_rf_state != RuntimeFilterState::READY) {
+ int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+ int64_t ms_remaining = wait_times_ms - ms_since_registration;
+ _rf_state = RuntimeFilterState::TIME_OUT;
+ if (ms_remaining <= 0) {
+ return false;
+ }
+ return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
+ [this] { return _rf_state == RuntimeFilterState::READY; });
}
- return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
- [this] { return this->_is_ready; });
}
return true;
}
+bool IRuntimeFilter::is_ready_or_timeout() {
+ DCHECK(is_consumer());
+ auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
+ // bitmap filter is precise filter and only filter once, so it must be applied.
+ int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+ ? _state->query_options().query_timeout
+ : _state->runtime_filter_wait_time_ms();
+ int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+ if (!_state->enable_pipeline_exec()) {
+ _rf_state = RuntimeFilterState::TIME_OUT;
+ return true;
+ } else if (is_ready()) {
+ if (cur_state == RuntimeFilterState::NOT_READY) {
+ _profile->add_info_string("EffectTime", std::to_string(ms_since_registration) + " ms");
+ }
+ return true;
+ } else {
+ if (cur_state == RuntimeFilterState::NOT_READY) {
+ _profile->add_info_string("EffectTime", std::to_string(ms_since_registration) + " ms");
+ }
+ if (is_ready()) {
+ return true;
+ }
+ bool timeout = wait_times_ms <= ms_since_registration;
+ auto expected = RuntimeFilterState::NOT_READY;
+ if (timeout) {
+ if (!_rf_state_atomic.compare_exchange_strong(expected, RuntimeFilterState::TIME_OUT,
+ std::memory_order_acq_rel)) {
+ DCHECK(expected == RuntimeFilterState::READY ||
+ expected == RuntimeFilterState::TIME_OUT);
+ return true;
+ }
+ return true;
+ }
+ if (!_rf_state_atomic.compare_exchange_strong(expected, RuntimeFilterState::NOT_READY,
+ std::memory_order_acq_rel)) {
+ DCHECK(expected == RuntimeFilterState::READY);
+ return true;
+ }
+ return false;
+ }
+}
+
void IRuntimeFilter::signal() {
DCHECK(is_consumer());
- std::unique_lock<std::mutex> lock(_inner_mutex);
- _is_ready = true;
- _inner_cv.notify_all();
+ if (_state->enable_pipeline_exec()) {
+ _rf_state_atomic.store(RuntimeFilterState::READY);
+ } else {
+ std::unique_lock<std::mutex> lock(_inner_mutex);
+ _rf_state = RuntimeFilterState::READY;
+ _inner_cv.notify_all();
+ }
if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) {
_profile->add_info_string("InFilterSize", std::to_string(_wrapper->get_in_filter_size()));
@@ -1412,7 +1479,9 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
_profile.reset(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
::doris::to_string(_runtime_filter_type))));
parent_profile->add_child(_profile.get(), true, nullptr);
- _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
+ if (!_state->enable_pipeline_exec()) {
+ _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
+ }
_profile->add_info_string("Info", _format_status());
if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
update_runtime_filter_type_to_profile();
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 5cd08aca46..e1a4b236e3 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -18,6 +18,7 @@
#pragma once
#include "exprs/expr_context.h"
+#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "util/uid_util.h"
@@ -120,6 +121,12 @@ struct MergeRuntimeFilterParams {
butil::IOBufAsZeroCopyInputStream* data;
};
+enum RuntimeFilterState {
+ READY,
+ NOT_READY,
+ TIME_OUT,
+};
+
/// The runtimefilter is built in the join node.
/// The main purpose is to reduce the scanning amount of the
/// left table data according to the scanning results of the right table during the join process.
@@ -135,7 +142,8 @@ public:
_is_broadcast_join(true),
_has_remote_target(false),
_has_local_target(false),
- _is_ready(false),
+ _rf_state(RuntimeFilterState::NOT_READY),
+ _rf_state_atomic(RuntimeFilterState::NOT_READY),
_role(RuntimeFilterRole::PRODUCER),
_expr_order(-1),
_always_true(false),
@@ -188,7 +196,16 @@ public:
bool has_remote_target() const { return _has_remote_target; }
- bool is_ready() const { return _is_ready; }
+ bool is_ready() const {
+ return (!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY) ||
+ (_state->enable_pipeline_exec() &&
+ _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY);
+ }
+ RuntimeFilterState current_state() const {
+ return _state->enable_pipeline_exec() ? _rf_state_atomic.load(std::memory_order_acquire)
+ : _rf_state;
+ }
+ bool is_ready_or_timeout();
bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; }
bool is_consumer() const { return _role == RuntimeFilterRole::CONSUMER; }
@@ -277,9 +294,25 @@ protected:
std::string _format_status() {
return fmt::format(
- "[IsPushDown = {}, IsEffective = {}, IsIgnored = {}, HasRemoteTarget = {}, "
+ "[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}]",
- _is_push_down, _is_ready, _is_ignored, _has_remote_target, _has_local_target);
+ _is_push_down, _get_explain_state_string(), _is_ignored, _has_remote_target,
+ _has_local_target);
+ }
+
+ std::string _get_explain_state_string() {
+ if (_state->enable_pipeline_exec()) {
+ return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY
+ ? "READY"
+ : _rf_state_atomic.load(std::memory_order_acquire) ==
+ RuntimeFilterState::TIME_OUT
+ ? "TIME_OUT"
+ : "NOT_READY";
+ } else {
+ return _rf_state == RuntimeFilterState::READY ? "READY"
+ : _rf_state == RuntimeFilterState::TIME_OUT ? "TIME_OUT"
+ : "NOT_READY";
+ }
}
RuntimeState* _state;
@@ -298,7 +331,8 @@ protected:
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
- bool _is_ready;
+ RuntimeFilterState _rf_state;
+ std::atomic<RuntimeFilterState> _rf_state_atomic;
// role consumer or producer
RuntimeFilterRole _role;
// expr index
diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp
index ed147db2be..faaee410d0 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -58,6 +58,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
_rpc_context->request.set_filter_id(_filter_id);
+ _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
_rpc_context->cntl.set_timeout_ms(1000);
_rpc_context->cid = _rpc_context->cntl.call_id();
diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp
index 770937c8bd..3b78b59e7c 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -50,19 +50,4 @@ std::string OperatorBase::debug_string() const {
return ss.str();
}
-/////////////////////////////////////// OperatorBuilder ////////////////////////////////////////////////////////////
-
-Status OperatorBuilderBase::prepare(doris::RuntimeState* state) {
- _state = state;
- // runtime filter, now dispose by NewOlapScanNode
- return Status::OK();
-}
-
-void OperatorBuilderBase::close(doris::RuntimeState* state) {
- if (_is_closed) {
- return;
- }
- _is_closed = true;
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 8196c8e1dc..4679832bb1 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -87,10 +87,6 @@ public:
virtual bool is_sink() const { return false; }
virtual bool is_source() const { return false; }
- virtual Status prepare(RuntimeState* state);
-
- virtual void close(RuntimeState* state);
-
std::string get_name() const { return _name; }
RuntimeState* runtime_state() { return _state; }
@@ -184,6 +180,8 @@ public:
virtual bool can_read() { return false; } // for source
+ virtual bool runtime_filters_are_ready_or_timeout() { return true; } // for source
+
virtual bool can_write() { return false; } // for sink
/**
diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp
index f28c7284fe..f9f2125878 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -24,22 +24,14 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
-Status ScanOperator::open(RuntimeState* state) {
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(SourceOperator::open(state));
- return _node->open(state);
-}
-
bool ScanOperator::can_read() {
- if (_node->_eos || !_node->_scanner_ctx || _node->_scanner_ctx->done() ||
- _node->_scanner_ctx->can_finish()) {
+ if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->can_finish()) {
// _eos: need eos
- // !_scanner_ctx: need call open
// _scanner_ctx->done(): need finish
// _scanner_ctx->can_finish(): should be scheduled
return true;
} else {
- return !_node->_scanner_ctx->empty_in_queue(); // have block to process
+ return !_node->_scanner_ctx->empty_in_queue(); // there are some blocks to process
}
}
@@ -47,10 +39,8 @@ bool ScanOperator::is_pending_finish() const {
return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish();
}
-Status ScanOperator::close(RuntimeState* state) {
- RETURN_IF_ERROR(SourceOperator::close(state));
- _node->close(state);
- return Status::OK();
+bool ScanOperator::runtime_filters_are_ready_or_timeout() {
+ return _node->runtime_filters_are_ready_or_timeout();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h
index 964625fa7d..f71a04af8a 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -42,9 +42,7 @@ public:
bool is_pending_finish() const override;
- Status open(RuntimeState* state) override;
-
- Status close(RuntimeState* state) override;
+ bool runtime_filters_are_ready_or_timeout() override;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 7c956a5e9d..c5a94ba640 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -21,16 +21,11 @@
namespace doris::pipeline {
-Status Pipeline::prepare(RuntimeState* state) {
+void Pipeline::_init_profile() {
std::stringstream ss;
ss << "Pipeline"
<< " (pipeline id=" << _pipeline_id << ")";
_pipeline_profile.reset(new RuntimeProfile(ss.str()));
- for (auto& op : _operator_builders) {
- RETURN_IF_ERROR(op->prepare(state));
- }
- RETURN_IF_ERROR(_sink->prepare(state));
- return Status::OK();
}
Status Pipeline::build_operators(Operators& operators) {
@@ -46,13 +41,6 @@ Status Pipeline::build_operators(Operators& operators) {
return Status::OK();
}
-void Pipeline::close(RuntimeState* state) {
- for (auto& op : _operator_builders) {
- op->close(state);
- }
- _sink->close(state);
-}
-
Status Pipeline::add_operator(OperatorBuilderPtr& op) {
if (_operator_builders.empty() && !op->is_source()) {
return Status::InternalError("Should set source before other operator");
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 595f16ba63..802c169d96 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -37,21 +37,17 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
public:
Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context)
- : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {}
-
- Status prepare(RuntimeState* state);
-
- void close(RuntimeState*);
+ : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {
+ _init_profile();
+ }
void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
pipeline->_parents.push_back(shared_from_this());
_dependencies.push_back(pipeline);
}
- // If all dependency be finished, the pipeline task shoule be scheduled
- // pipeline is finish must call the parents `finish_one_dependency`
- // like the condition variables.
- // Eg: hash build finish must call the hash probe the method
+ // If all dependencies are finished, this pipeline task should be scheduled.
+ // e.g. Hash join probe task will be scheduled once Hash join build task is finished.
bool finish_one_dependency() {
DCHECK(_complete_dependency < _dependencies.size());
return _complete_dependency.fetch_add(1) == _dependencies.size() - 1;
@@ -70,6 +66,7 @@ public:
RuntimeProfile* runtime_profile() { return _pipeline_profile.get(); }
private:
+ void _init_profile();
std::atomic<uint32_t> _complete_dependency;
OperatorBuilders _operator_builders; // left is _source, right is _root
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index ecb79bb2d0..d87afb8321 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -272,10 +272,6 @@ Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& re
Status PipelineFragmentContext::_build_pipeline_tasks(
const doris::TExecPlanFragmentParams& request) {
- for (auto& pipeline : _pipelines) {
- RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
- }
-
for (PipelinePtr& pipeline : _pipelines) {
// if sink
auto sink = pipeline->sink()->build_operator();
diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h
index 5302cc83b5..5bc58145c0 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -77,6 +77,11 @@ public:
std::string to_http_path(const std::string& file_name);
+ void set_merge_controller_handler(
+ std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
+ _merge_controller_handler = handler;
+ }
+
void send_report(bool);
private:
@@ -118,6 +123,7 @@ private:
// If set the true, this plan fragment will be executed only after FE send execution start rpc.
bool _need_wait_execution_trigger = false;
+ std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
MonotonicStopWatch _fragment_watcher;
// RuntimeProfile::Counter* _start_timer;
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 250dfc2bab..96fbd40f90 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -47,23 +47,13 @@ Status PipelineTask::prepare(RuntimeState* state) {
RETURN_IF_ERROR(o->prepare(state));
}
_block.reset(new doris::vectorized::Block());
- _init_state();
+
+ // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters).
+ set_state(RUNNABLE);
_prepared = true;
return Status::OK();
}
-void PipelineTask::_init_state() {
- if (has_dependency()) {
- set_state(BLOCKED_FOR_DEPENDENCY);
- } else if (!(_source->can_read())) {
- set_state(BLOCKED_FOR_SOURCE);
- } else if (!(_sink->can_write())) {
- set_state(BLOCKED_FOR_SINK);
- } else {
- set_state(RUNNABLE);
- }
-}
-
bool PipelineTask::has_dependency() {
if (_dependency_finish) {
return false;
@@ -86,12 +76,12 @@ bool PipelineTask::has_dependency() {
}
Status PipelineTask::open() {
- if (_sink) {
- RETURN_IF_ERROR(_sink->open(_state));
- }
for (auto& o : _operators) {
RETURN_IF_ERROR(o->open(_state));
}
+ if (_sink) {
+ RETURN_IF_ERROR(_sink->open(_state));
+ }
_opened = true;
return Status::OK();
}
@@ -103,6 +93,19 @@ Status PipelineTask::execute(bool* eos) {
// The status must be runnable
*eos = false;
if (!_opened) {
+ {
+ SCOPED_RAW_TIMER(&time_spent);
+ auto st = open();
+ if (st.is_blocked_by_rf()) {
+ set_state(BLOCKED_FOR_RF);
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(st);
+ }
+ if (has_dependency()) {
+ set_state(BLOCKED_FOR_DEPENDENCY);
+ return Status::OK();
+ }
if (!_source->can_read()) {
set_state(BLOCKED_FOR_SOURCE);
return Status::OK();
@@ -111,8 +114,6 @@ Status PipelineTask::execute(bool* eos) {
set_state(BLOCKED_FOR_SINK);
return Status::OK();
}
- SCOPED_RAW_TIMER(&time_spent);
- RETURN_IF_ERROR(open());
}
while (!_fragment_context->is_canceled()) {
@@ -168,7 +169,6 @@ Status PipelineTask::close() {
COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time());
COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time());
}
- _pipeline->close(_state);
return s;
}
@@ -204,14 +204,14 @@ void PipelineTask::set_state(PipelineTaskState state) {
}
std::string PipelineTask::debug_string() const {
- std::stringstream ss;
- ss << "PipelineTask(" << _index << ")" << get_state_name(_cur_state) << "\nsink: ";
- ss << _sink->debug_string();
- ss << "\n operators(from source to root)";
- for (auto operatr : _operators) {
- ss << "\n" << operatr->debug_string();
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index,
+ get_state_name(_cur_state));
+ for (size_t i = 0; i < _operators.size(); i++) {
+ fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
+ _operators[i]->debug_string());
}
- return ss.str();
+ return fmt::to_string(debug_string_buffer);
}
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 34cce073e6..213aaf2507 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -57,7 +57,8 @@ enum PipelineTaskState : uint8_t {
PENDING_FINISH =
5, // compute task is over, but still hold resource. like some scan and sink task
FINISHED = 6,
- CANCELED = 7
+ CANCELED = 7,
+ BLOCKED_FOR_RF = 8,
};
inline const char* get_state_name(PipelineTaskState idx) {
@@ -78,6 +79,8 @@ inline const char* get_state_name(PipelineTaskState idx) {
return "FINISHED";
case PipelineTaskState::CANCELED:
return "CANCELED";
+ case PipelineTaskState::BLOCKED_FOR_RF:
+ return "BLOCKED_FOR_RF";
}
__builtin_unreachable();
}
@@ -117,21 +120,15 @@ public:
PipelineTaskState get_state() { return _cur_state; }
void set_state(PipelineTaskState state);
- bool is_blocking_state() {
- switch (_cur_state) {
- case BLOCKED_FOR_DEPENDENCY:
- case BLOCKED_FOR_SOURCE:
- case BLOCKED_FOR_SINK:
- return true;
- default:
- return false;
- }
- }
bool is_pending_finish() { return _source->is_pending_finish() || _sink->is_pending_finish(); }
bool source_can_read() { return _source->can_read(); }
+ bool runtime_filters_are_ready_or_timeout() {
+ return _source->runtime_filters_are_ready_or_timeout();
+ }
+
bool sink_can_write() { return _sink->can_write(); }
Status finalize();
@@ -165,7 +162,6 @@ public:
private:
Status open();
void _init_profile();
- void _init_state();
uint32_t _index;
PipelinePtr _pipeline;
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 18e5ce2449..c74be19f0d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -120,6 +120,12 @@ void BlockedTaskScheduler::_schedule() {
} else {
iter++;
}
+ } else if (state == BLOCKED_FOR_RF) {
+ if (task->runtime_filters_are_ready_or_timeout()) {
+ _make_task_run(local_blocked_tasks, iter, ready_tasks);
+ } else {
+ iter++;
+ }
} else if (state == BLOCKED_FOR_SINK) {
if (task->sink_can_write()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
@@ -193,11 +199,7 @@ Status TaskScheduler::start() {
}
Status TaskScheduler::schedule_task(PipelineTask* task) {
- if (task->is_blocking_state()) {
- _blocked_task_scheduler->add_blocked_task(task);
- } else {
- _task_queue->push_back(task);
- }
+ _task_queue->push_back(task);
// TODO control num of task
return Status::OK();
}
@@ -225,8 +227,7 @@ void TaskScheduler::_do_work(size_t index) {
auto check_state = task->get_state();
if (check_state == PENDING_FINISH) {
- bool is_pending = task->is_pending_finish();
- DCHECK(!is_pending) << "must not pending close " << task->debug_string();
+ DCHECK(!task->is_pending_finish()) << "must not pending close " << task->debug_string();
_try_close_task(task, canceled ? CANCELED : FINISHED);
continue;
}
@@ -245,7 +246,8 @@ void TaskScheduler::_do_work(size_t index) {
auto status = task->execute(&eos);
task->set_previous_core_id(index);
if (!status.ok()) {
- LOG(WARNING) << "Pipeline taks execute task fail " << task->debug_string();
+ LOG(WARNING) << "Pipeline task execute task fail " << task->debug_string() << " "
+ << status.to_string();
// exec failed,cancel all fragment instance
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "execute fail");
_try_close_task(task, CANCELED);
@@ -272,6 +274,8 @@ void TaskScheduler::_do_work(size_t index) {
switch (pipeline_state) {
case BLOCKED_FOR_SOURCE:
case BLOCKED_FOR_SINK:
+ case BLOCKED_FOR_RF:
+ case BLOCKED_FOR_DEPENDENCY:
_blocked_task_scheduler->add_blocked_task(task);
break;
case RUNNABLE:
@@ -295,6 +299,11 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state)
if (!status.ok()) {
// TODO: LOG warning
}
+ if (task->is_pending_finish()) {
+ task->set_state(PENDING_FINISH);
+ _blocked_task_scheduler->add_blocked_task(task);
+ return;
+ }
task->set_state(state);
// TODO: rethink the logic
if (state == CANCELED) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b0d00fb99f..0037ec255a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -714,7 +714,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
RETURN_IF_ERROR(exec_state->prepare(params));
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
- // TODO: Support runtime filter on pipeline engine
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
_runtimefilter_controller.add_entity(params, &handler,
exec_state->executor()->runtime_state());
@@ -757,6 +756,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
}
+ std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
+ _runtimefilter_controller.add_entity(params, &handler, context->get_runtime_state());
+ context->set_merge_controller_handler(handler);
+
{
std::lock_guard<std::mutex> lock(_lock);
_pipeline_map.insert(std::make_pair(fragment_instance_id, context));
@@ -1028,12 +1031,34 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
- // TODO pipeline runtime filter
+ bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+
UniqueId fragment_instance_id = request->fragment_id();
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
+
std::shared_ptr<FragmentExecState> fragment_state;
+ std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
- {
+ RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+ if (is_pipeline) {
+ std::unique_lock<std::mutex> lock(_lock);
+
+ if (!_pipeline_map.count(tfragment_instance_id)) {
+ VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << fragment_instance_id;
+ _cv.wait_for(lock, std::chrono::milliseconds(1000),
+ [&] { return _pipeline_map.count(tfragment_instance_id); });
+ }
+
+ auto iter = _pipeline_map.find(tfragment_instance_id);
+ if (iter == _pipeline_map.end()) {
+ VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
+ return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
+ }
+ pip_context = iter->second;
+
+ DCHECK(pip_context != nullptr);
+ runtime_filter_mgr = pip_context->get_runtime_state()->runtime_filter_mgr();
+ } else {
std::unique_lock<std::mutex> lock(_lock);
if (!_fragment_map.count(tfragment_instance_id)) {
VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << fragment_instance_id;
@@ -1047,26 +1072,37 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
}
fragment_state = iter->second;
- }
- DCHECK(fragment_state != nullptr);
- RuntimeFilterMgr* runtime_filter_mgr =
- fragment_state->executor()->runtime_state()->runtime_filter_mgr();
+ DCHECK(fragment_state != nullptr);
+ runtime_filter_mgr = fragment_state->executor()->runtime_state()->runtime_filter_mgr();
+ }
- Status st = runtime_filter_mgr->update_filter(request, attach_data);
- return st;
+ return runtime_filter_mgr->update_filter(request, attach_data);
}
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
UniqueId queryid = request->query_id();
+ bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
auto fragment_instance_id = filter_controller->instance_id();
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
std::shared_ptr<FragmentExecState> fragment_state;
- {
+ std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+ if (is_pipeline) {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto iter = _pipeline_map.find(tfragment_instance_id);
+ if (iter == _pipeline_map.end()) {
+ VLOG_CRITICAL << "unknown fragment-id:" << fragment_instance_id;
+ return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
+ }
+
+ // hold reference to pip_context, or else runtime_state can be destroyed
+ // when filter_controller->merge is still in progress
+ pip_context = iter->second;
+ } else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_map.find(tfragment_instance_id);
if (iter == _fragment_map.end()) {
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index ce851c119a..bf0bfd0d9d 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -238,6 +238,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
size_t cur = rpc_contexts.size() - 1;
rpc_contexts[cur]->request = apply_request;
rpc_contexts[cur]->request.set_filter_id(request->filter_id());
+ rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
+ request->is_pipeline());
*rpc_contexts[cur]->request.mutable_query_id() = request->query_id();
if (has_attachment) {
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 5af2f7df55..f01156e893 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -323,7 +323,10 @@ public:
}
return _query_options.be_exec_version;
}
- bool enable_pipeline_exec() const { return _query_options.enable_pipeline_engine; }
+ bool enable_pipeline_exec() const {
+ return _query_options.__isset.enable_pipeline_engine &&
+ _query_options.enable_pipeline_engine;
+ }
bool trim_tailing_spaces_for_external_table_query() const {
return _query_options.trim_tailing_spaces_for_external_table_query;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 9db08c937c..cf017da7c6 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -84,13 +84,22 @@ Status VScanNode::prepare(RuntimeState* state) {
Status VScanNode::open(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
- _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
- _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(ExecNode::open(state));
+ return ExecNode::open(state);
+}
+Status VScanNode::alloc_resource(RuntimeState* state) {
+ if (_opened) {
+ return Status::OK();
+ }
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+ _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
+ _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+ START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::alloc_resource");
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_process_conjuncts());
@@ -102,6 +111,8 @@ Status VScanNode::open(RuntimeState* state) {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners));
}
+ RETURN_IF_CANCELLED(state);
+ _opened = true;
return Status::OK();
}
@@ -213,6 +224,20 @@ Status VScanNode::_register_runtime_filter() {
return Status::OK();
}
+bool VScanNode::runtime_filters_are_ready_or_timeout() {
+ if (!_blocked_by_rf) {
+ return true;
+ }
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
+ if (!runtime_filter->is_ready_or_timeout()) {
+ return false;
+ }
+ }
+ _blocked_by_rf = false;
+ return true;
+}
+
Status VScanNode::_acquire_runtime_filter(bool wait) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
std::vector<VExpr*> vexprs;
@@ -229,14 +254,23 @@ Status VScanNode::_acquire_runtime_filter(bool wait) {
if (!ready && wait) {
ready = runtime_filter->await();
}
- if (ready) {
+ if (ready && !_runtime_filter_ctxs[i].apply_mark) {
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
_runtime_filter_ctxs[i].apply_mark = true;
- } else {
+ } else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
+ runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
+ !_runtime_filter_ctxs[i].apply_mark) {
+ _blocked_by_rf = true;
+ } else if (!_runtime_filter_ctxs[i].apply_mark) {
+ DCHECK(!_blocked_by_rf &&
+ runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
_is_all_rf_applied = false;
}
}
RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
+ if (_blocked_by_rf) {
+ return Status::WaitForRf("Runtime filters are neither not ready nor timeout");
+ }
return Status::OK();
}
@@ -304,6 +338,12 @@ Status VScanNode::close(RuntimeState* state) {
return Status::OK();
}
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::close");
+ RETURN_IF_ERROR(ExecNode::close(state));
+ return Status::OK();
+}
+
+void VScanNode::release_resource(RuntimeState* state) {
+ START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::release_resource");
if (_scanner_ctx.get()) {
// stop and wait the scanner scheduler to be done
// _scanner_ctx may not be created for some short circuit case.
@@ -321,8 +361,7 @@ Status VScanNode::close(RuntimeState* state) {
}
_scanner_pool.clear();
- RETURN_IF_ERROR(ExecNode::close(state));
- return Status::OK();
+ ExecNode::release_resource(state);
}
Status VScanNode::_normalize_conjuncts() {
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 3e6132bc29..2f6e5c6b4a 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -91,6 +91,10 @@ public:
const TupleDescriptor* input_tuple_desc() const { return _input_tuple_desc; }
const TupleDescriptor* output_tuple_desc() const { return _output_tuple_desc; }
+ Status alloc_resource(RuntimeState* state) override;
+ void release_resource(RuntimeState* state) override;
+ bool runtime_filters_are_ready_or_timeout();
+
enum class PushDownType {
// The predicate can not be pushed down to data source
UNACCEPTABLE,
@@ -202,6 +206,7 @@ protected:
// indicate this scan node has no more data to return
bool _eos = false;
+ bool _opened = false;
FilterPredicates _filter_predicates {};
@@ -223,6 +228,7 @@ protected:
std::vector<ColumnValueRangeType> _not_in_value_ranges;
bool _need_agg_finalize = true;
+ bool _blocked_by_rf = false;
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector
// so that it will be destroyed uniformly at the end of the query.
diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp
index ba73754404..8e34b1e444 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -31,6 +31,7 @@ VExchangeNode::VExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const Des
: ExecNode(pool, tnode, descs),
_num_senders(0),
_is_merging(tnode.exchange_node.__isset.sort_info),
+ _is_ready(false),
_stream_recvr(nullptr),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
@@ -70,9 +71,11 @@ Status VExchangeNode::alloc_resource(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
- RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
- _is_asc_order, _nulls_first,
- state->batch_size(), _limit, _offset));
+ if (!state->enable_pipeline_exec()) {
+ RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
+ _is_asc_order, _nulls_first,
+ state->batch_size(), _limit, _offset));
+ }
}
return Status::OK();
}
@@ -93,6 +96,13 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next");
SCOPED_TIMER(runtime_profile()->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+ if (_is_merging && state->enable_pipeline_exec() && !_is_ready) {
+ RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
+ _is_asc_order, _nulls_first,
+ state->batch_size(), _limit, _offset));
+ _is_ready = true;
+ return Status::OK();
+ }
auto status = _stream_recvr->get_next(block, eos);
if (block != nullptr) {
if (_num_rows_returned + block->rows() < _limit) {
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index dc633cafd0..1d767c8cb2 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -51,6 +51,7 @@ public:
private:
int _num_senders;
bool _is_merging;
+ bool _is_ready;
std::shared_ptr<VDataStreamRecvr> _stream_recvr;
RowDescriptor _input_row_desc;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h
index 386b8ef41f..e8b70bbeca 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -217,7 +217,9 @@ public:
void _update_block_queue_empty() override { _block_queue_empty = _block_queue.empty(); }
Status get_batch(Block** next_block) override {
- CHECK(!should_wait());
+ CHECK(!should_wait()) << " _is_cancelled: " << _is_cancelled
+ << ", _block_queue_empty: " << _block_queue_empty
+ << ", _num_remaining_senders: " << _num_remaining_senders;
std::lock_guard<std::mutex> l(_lock); // protect _block_queue
return _inner_get_batch(next_block);
}
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index bf61b22707..ff05509e67 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -438,6 +438,7 @@ message PMergeFilterRequest {
optional PMinMaxFilter minmax_filter = 5;
optional PBloomFilter bloom_filter = 6;
optional PInFilter in_filter = 7;
+ optional bool is_pipeline = 8;
};
message PMergeFilterResponse {
@@ -452,6 +453,7 @@ message PPublishFilterRequest {
optional PMinMaxFilter minmax_filter = 5;
optional PBloomFilter bloom_filter = 6;
optional PInFilter in_filter = 7;
+ optional bool is_pipeline = 8;
};
message PPublishFilterResponse {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org