You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/04/19 15:50:29 UTC

(doris) 08/08: [Chore](runtime-filter) adjust need_local_merge setting conditions (#33886)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 09b973db49d70c8a5be5e33aa99e575b09691f14
Author: Pxl <px...@qq.com>
AuthorDate: Fri Apr 19 23:49:09 2024 +0800

    [Chore](runtime-filter) adjust need_local_merge setting conditions (#33886)
---
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 30 ++++++++--------------
 be/src/runtime/runtime_state.cpp                   |  3 ---
 2 files changed, 11 insertions(+), 22 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 5d49aaf408e..c62ed04e580 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -227,9 +227,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
         _query_ctx->init_runtime_predicates({0});
     }
 
-    _need_local_merge =
-            request.__isset.parallel_instances &&
-            (request.__isset.per_node_shared_scans && !request.per_node_shared_scans.empty());
+    _need_local_merge = request.__isset.parallel_instances;
+
     // 2. Build pipelines with operators in this fragment.
     auto root_pipeline = add_pipeline();
     RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
@@ -926,14 +925,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
     // Therefore, here we need to use a stack-like structure.
     _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
     std::stringstream error_msg;
+
     switch (tnode.node_type) {
     case TPlanNodeType::OLAP_SCAN_NODE: {
         op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) {
-            if (request.__isset.parallel_instances) {
-                cur_pipe->set_num_tasks(request.parallel_instances);
-            }
+        if (request.__isset.parallel_instances) {
+            cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
         }
         break;
@@ -947,10 +945,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
                     "Jdbc scan node is disabled, you can change be config enable_java_support "
                     "to true and restart be.");
         }
-        if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) {
-            if (request.__isset.parallel_instances) {
-                cur_pipe->set_num_tasks(request.parallel_instances);
-            }
+        if (request.__isset.parallel_instances) {
+            cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
         }
         break;
@@ -958,10 +954,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
     case doris::TPlanNodeType::FILE_SCAN_NODE: {
         op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) {
-            if (request.__isset.parallel_instances) {
-                cur_pipe->set_num_tasks(request.parallel_instances);
-            }
+        if (request.__isset.parallel_instances) {
+            cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
         }
         break;
@@ -970,10 +964,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
     case TPlanNodeType::ES_HTTP_SCAN_NODE: {
         op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) {
-            if (request.__isset.parallel_instances) {
-                cur_pipe->set_num_tasks(request.parallel_instances);
-            }
+        if (request.__isset.parallel_instances) {
+            cur_pipe->set_num_tasks(request.parallel_instances);
             op->set_ignore_data_distribution();
         }
         break;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index a541b2d2adc..75d06adc561 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -531,9 +531,6 @@ Status RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilte
                                                       bool need_local_merge,
                                                       doris::IRuntimeFilter** producer_filter,
                                                       bool build_bf_exactly) {
-    // If runtime filter need to be local merged, `build_bf_exactly` will lead to bloom filters with
-    // different size need to be merged which is not allowed.
-    // So if `need_local_merge` is true, we will disable `build_bf_exactly`.
     if (desc.has_remote_targets || need_local_merge) {
         return global_runtime_filter_mgr()->register_local_merge_producer_filter(
                 desc, query_options(), producer_filter, build_bf_exactly);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org