You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2023/06/13 03:20:56 UTC
[doris] branch master updated: [Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e010fa8d4f [Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593)
e010fa8d4f is described below
commit e010fa8d4ff17e0800c9b71fd0b1701b7d205afb
Author: Pxl <px...@qq.com>
AuthorDate: Tue Jun 13 11:20:49 2023 +0800
[Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593)
---
be/src/common/config.cpp | 4 ---
be/src/common/config.h | 4 ---
be/src/exprs/runtime_filter.cpp | 29 -------------------
be/src/exprs/runtime_filter.h | 15 +++++-----
be/src/exprs/runtime_filter_rpc.cpp | 35 +++++++++++++----------
be/src/exprs/runtime_filter_slots.h | 37 ++++++++++++++-----------
be/src/exprs/runtime_filter_slots_cross.h | 10 +++----
be/src/pipeline/exec/hashjoin_build_sink.h | 1 +
be/src/runtime/runtime_filter_mgr.cpp | 8 +++---
be/src/vec/exec/join/vhash_join_node.cpp | 17 +++++++-----
be/src/vec/exec/join/vhash_join_node.h | 9 +++++-
be/src/vec/exec/join/vnested_loop_join_node.cpp | 2 +-
12 files changed, 77 insertions(+), 94 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f23bbffe99..f3ea211a8e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -732,10 +732,6 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576");
// In most cases, it does not need to be modified.
DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
-// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
-// else we will call sync method
-DEFINE_mBool(runtime_filter_use_async_rpc, "true");
-
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7b61e13c92..4f6a608929 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -768,10 +768,6 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes);
// In most cases, it does not need to be modified.
DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
-// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
-// else we will call sync method
-DECLARE_mBool(runtime_filter_use_async_rpc);
-
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index ef8543e136..1a4b183e5d 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1019,26 +1019,6 @@ public:
PrimitiveType column_type() { return _column_return_type; }
- void ready_for_publish() {
- if (_filter_type == RuntimeFilterType::MINMAX_FILTER) {
- switch (_column_return_type) {
- case TYPE_VARCHAR:
- case TYPE_CHAR:
- case TYPE_STRING: {
- StringRef* min_value = static_cast<StringRef*>(_context.minmax_func->get_min());
- StringRef* max_value = static_cast<StringRef*>(_context.minmax_func->get_max());
- auto min_val_ptr = _pool->add(new std::string(min_value->data));
- auto max_val_ptr = _pool->add(new std::string(max_value->data));
- StringRef min_val(min_val_ptr->c_str(), min_val_ptr->length());
- StringRef max_val(max_val_ptr->c_str(), max_val_ptr->length());
- _context.minmax_func->assign(&min_val, &max_val);
- }
- default:
- break;
- }
- }
- }
-
bool is_bloomfilter() const { return _is_bloomfilter; }
bool is_ignored_in_filter() const { return _is_ignored_in_filter; }
@@ -1164,11 +1144,6 @@ Status IRuntimeFilter::publish() {
}
}
-void IRuntimeFilter::publish_finally() {
- DCHECK(is_producer());
- join_rpc();
-}
-
Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs) {
DCHECK(is_consumer());
if (!_is_ignored) {
@@ -1531,10 +1506,6 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {
}
}
-void IRuntimeFilter::ready_for_publish() {
- _wrapper->ready_for_publish();
-}
-
Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
if (!_is_ignored && wrapper->is_ignored_in_filter()) {
set_ignored();
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 8389e8416b..6ff64a9af3 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -217,8 +217,6 @@ public:
// push filter to remote node or push down it to scan_node
Status publish();
- void publish_finally();
-
RuntimeFilterType type() const { return _runtime_filter_type; }
Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs);
@@ -293,9 +291,12 @@ public:
// consumer should call before released
Status consumer_close();
+ bool is_finish_rpc();
+
+ Status join_rpc();
+
// async push runtimefilter to remote node
Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, bool opt_remote_rf);
- Status join_rpc();
void init_profile(RuntimeProfile* parent_profile);
@@ -303,8 +304,6 @@ public:
void update_runtime_filter_type_to_profile();
- void ready_for_publish();
-
static bool enable_use_batch(bool use_batch, PrimitiveType type) {
return use_batch && (is_int_or_bool(type) || is_float_or_double(type));
}
@@ -389,9 +388,9 @@ protected:
std::vector<doris::vectorized::VExprSPtr> _push_down_vexprs;
- struct rpc_context;
+ struct RPCContext;
- std::shared_ptr<rpc_context> _rpc_context;
+ std::shared_ptr<RPCContext> _rpc_context;
// parent profile
// only effect on consumer
@@ -403,7 +402,7 @@ protected:
const bool _enable_pipeline_exec;
bool _profile_init = false;
- doris::Mutex _profile_mutex;
+ std::mutex _profile_mutex;
std::string _name;
bool _opt_remote_rf;
};
diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp
index 829224f3f2..4d613e3a57 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -38,11 +38,14 @@
namespace doris {
-struct IRuntimeFilter::rpc_context {
+struct IRuntimeFilter::RPCContext {
PMergeFilterRequest request;
PMergeFilterResponse response;
brpc::Controller cntl;
brpc::CallId cid;
+ bool is_finished = false;
+
+ static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished = true; }
};
Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
@@ -54,10 +57,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
if (!stub) {
std::string msg =
fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port);
- LOG(WARNING) << msg;
return Status::InternalError(msg);
}
- _rpc_context = std::make_shared<IRuntimeFilter::rpc_context>();
+ _rpc_context = std::make_shared<IRuntimeFilter::RPCContext>();
void* data = nullptr;
int len = 0;
@@ -72,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
_rpc_context->request.set_filter_id(_filter_id);
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
_rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
- _rpc_context->cntl.set_timeout_ms(1000);
+ _rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms());
_rpc_context->cid = _rpc_context->cntl.call_id();
Status serialize_status = serialize(&_rpc_context->request, &data, &len);
@@ -83,14 +85,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
DCHECK(data != nullptr);
_rpc_context->cntl.request_attachment().append(data, len);
}
- if (config::runtime_filter_use_async_rpc) {
- stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
- brpc::DoNothing());
- } else {
- stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
- nullptr);
- _rpc_context.reset();
- }
+
+ stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
+ brpc::NewCallback(RPCContext::finish, _rpc_context));
} else {
// we should reset context
@@ -99,15 +96,25 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
return serialize_status;
}
+bool IRuntimeFilter::is_finish_rpc() {
+ if (_rpc_context == nullptr) {
+ return true;
+ }
+ return _rpc_context->is_finished;
+}
+
Status IRuntimeFilter::join_rpc() {
- DCHECK(is_producer());
+ if (!is_producer()) {
+ return Status::InternalError("RuntimeFilter::join_rpc only called when rf is producer.");
+ }
if (_rpc_context != nullptr) {
brpc::Join(_rpc_context->cid);
if (_rpc_context->cntl.Failed()) {
- LOG(WARNING) << "runtimefilter rpc err:" << _rpc_context->cntl.ErrorText();
// reset stub cache
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
_rpc_context->cntl.remote_side());
+ return Status::InternalError("RuntimeFilter::join_rpc meet rpc error, msg={}.",
+ _rpc_context->cntl.ErrorText());
}
}
return Status::OK();
diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h
index b1f4b125b4..d6a5866cae 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -17,6 +17,7 @@
#pragma once
+#include "common/status.h"
#include "exprs/runtime_filter.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
@@ -60,8 +61,8 @@ public:
auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) {
runtime_filter->set_ignored();
runtime_filter->set_ignored_msg(msg);
- runtime_filter->publish();
- runtime_filter->publish_finally();
+ RETURN_IF_ERROR(runtime_filter->publish());
+ return Status::OK();
};
// ordered vector: IN, IN_OR_BLOOM, others.
@@ -142,9 +143,9 @@ public:
"in_num({}) >= max_in_num({})",
print_id(state->fragment_instance_id()), filter_desc.filter_id,
hash_table_size, max_in_num);
- ignore_remote_filter(runtime_filter, msg);
+ RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
#else
- ignore_remote_filter(runtime_filter, "ignored");
+ RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, "ignored"));
#endif
continue;
}
@@ -196,29 +197,33 @@ public:
}
}
- // should call this method after insert
- void ready_for_publish() {
+ bool ready_finish_publish() {
for (auto& pair : _runtime_filters) {
for (auto filter : pair.second) {
- filter->ready_for_publish();
+ if (!filter->is_finish_rpc()) {
+ return false;
+ }
}
}
+ return true;
}
- // publish runtime filter
- void publish() {
- for (int i = 0; i < _probe_expr_context.size(); ++i) {
- auto iter = _runtime_filters.find(i);
- if (iter != _runtime_filters.end()) {
- for (auto filter : iter->second) {
- filter->publish();
- }
+
+ void finish_publish() {
+ for (auto& pair : _runtime_filters) {
+ for (auto filter : pair.second) {
+ filter->join_rpc();
}
}
+ }
+
+ // publish runtime filter
+ Status publish() {
for (auto& pair : _runtime_filters) {
for (auto filter : pair.second) {
- filter->publish_finally();
+ RETURN_IF_ERROR(filter->publish());
}
}
+ return Status::OK();
}
void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) {
diff --git a/be/src/exprs/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter_slots_cross.h
index 1e8c15e713..184ba46591 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -19,6 +19,7 @@
#include <vector>
+#include "common/status.h"
#include "exprs/runtime_filter.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
@@ -88,14 +89,11 @@ public:
return Status::OK();
}
- void publish() {
+ Status publish() {
for (auto& filter : _runtime_filters) {
- filter->publish();
- }
- for (auto& filter : _runtime_filters) {
- // todo: cross join may not need publish_finally()
- filter->publish_finally();
+ RETURN_IF_ERROR(filter->publish());
}
+ return Status::OK();
}
bool empty() { return !_runtime_filters.size(); }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h
index dfa077b83f..c1e67512dd 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -39,6 +39,7 @@ class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilde
public:
HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return _node->can_sink_write(); }
+ bool is_pending_finish() const override { return !_node->ready_for_finish(); }
};
} // namespace pipeline
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index b6dfa0a625..a566018c11 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -43,7 +43,7 @@
namespace doris {
template <class RPCRequest, class RPCResponse>
-struct async_rpc_context {
+struct AsyncRPCContext {
RPCRequest request;
RPCResponse response;
brpc::Controller cntl;
@@ -55,7 +55,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state
RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx)
: _query_ctx(query_ctx) {}
-RuntimeFilterMgr::~RuntimeFilterMgr() {}
+RuntimeFilterMgr::~RuntimeFilterMgr() = default;
Status RuntimeFilterMgr::init() {
_tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
@@ -315,7 +315,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
// 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0)
// 3. This filter is bloom filter (only bloom filter should be used for merging)
using PPublishFilterRpcContext =
- async_rpc_context<PPublishFilterRequestV2, PPublishFilterResponse>;
+ AsyncRPCContext<PPublishFilterRequestV2, PPublishFilterResponse>;
std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
rpc_contexts.reserve(cntVal->targetv2_info.size());
@@ -380,7 +380,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
} else {
// prepare rpc context
using PPublishFilterRpcContext =
- async_rpc_context<PPublishFilterRequest, PPublishFilterResponse>;
+ AsyncRPCContext<PPublishFilterRequest, PPublishFilterResponse>;
std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
rpc_contexts.reserve(cntVal->target_info.size());
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 822b99db58..9b410f56ae 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -285,9 +285,9 @@ struct ProcessRuntimeFilterBuild {
if (_join_node->_runtime_filter_descs.empty()) {
return Status::OK();
}
- _join_node->_runtime_filter_slots = _join_node->_pool->add(
- new VRuntimeFilterSlots(_join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs,
- _join_node->_runtime_filter_descs));
+ _join_node->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
+ _join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs,
+ _join_node->_runtime_filter_descs);
RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init(
state, hash_table_ctx.hash_table.get_size(), _join_node->_build_bf_cardinality));
@@ -300,7 +300,7 @@ struct ProcessRuntimeFilterBuild {
}
{
SCOPED_TIMER(_join_node->_push_down_timer);
- _join_node->_runtime_filter_slots->publish();
+ RETURN_IF_ERROR(_join_node->_runtime_filter_slots->publish());
}
return Status::OK();
@@ -917,15 +917,15 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
if (_runtime_filter_descs.empty()) {
return Status::OK();
}
- _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots(
+ _runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
_probe_expr_ctxs, _build_expr_ctxs,
- _runtime_filter_descs));
+ _runtime_filter_descs);
RETURN_IF_ERROR(_runtime_filter_slots->init(
state, arg.hash_table.get_size(), 0));
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));
- _runtime_filter_slots->publish();
+ RETURN_IF_ERROR(_runtime_filter_slots->publish());
return Status::OK();
}},
*_hash_table_variants);
@@ -1261,6 +1261,9 @@ HashJoinNode::~HashJoinNode() {
// signal at here is abnormal
_shared_hashtable_controller->signal(id(), Status::Cancelled("signaled in destructor"));
}
+ if (_runtime_filter_slots != nullptr) {
+ _runtime_filter_slots->finish_publish();
+ }
}
void HashJoinNode::_release_mem() {
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 36398ba4ad..4e9cd40e9a 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -252,6 +252,13 @@ public:
bool should_build_hash_table() const { return _should_build_hash_table; }
+ bool ready_for_finish() {
+ if (_runtime_filter_slots == nullptr) {
+ return true;
+ }
+ return _runtime_filter_slots->ready_finish_publish();
+ }
+
private:
void _init_short_circuit_for_probe() override {
_short_circuit_for_probe =
@@ -341,7 +348,7 @@ private:
bool _is_broadcast_join = false;
bool _should_build_hash_table = true;
std::shared_ptr<SharedHashTableController> _shared_hashtable_controller = nullptr;
- VRuntimeFilterSlots* _runtime_filter_slots = nullptr;
+ std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots = nullptr;
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _left_output_slot_flags;
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 57ad81f120..491a9c6854 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -84,7 +84,7 @@ struct RuntimeFilterBuild {
}
{
SCOPED_TIMER(_join_node->_push_down_timer);
- runtime_filter_slots.publish();
+ RETURN_IF_ERROR(runtime_filter_slots.publish());
}
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org