You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/12/13 07:33:15 UTC

[doris] branch master updated: [Refactor](exec) refactor the code of datasink eos logic (#15009)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 284a3351f4 [Refactor](exec) refactor the code of datasink eos logic (#15009)
284a3351f4 is described below

commit 284a3351f46db081865e5487cc605d8457c7d9c6
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue Dec 13 15:33:08 2022 +0800

    [Refactor](exec) refactor the code of datasink eos logic (#15009)
---
 .../exec/nested_loop_join_probe_operator.h         |  1 -
 be/src/pipeline/exec/operator.h                    |  6 ++--
 be/src/runtime/plan_fragment_executor.cpp          | 42 +++++++++-------------
 be/src/runtime/plan_fragment_executor.h            |  3 +-
 be/src/util/mysql_load_error_hub.cpp               |  1 +
 be/src/vec/exec/join/vhash_join_node.h             |  2 +-
 6 files changed, 23 insertions(+), 32 deletions(-)

diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 4de351e441..bcbf08111c 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -1,4 +1,3 @@
-
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 46c79c8470..8196c8e1dc 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -278,10 +278,10 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
         SCOPED_TIMER(_runtime_profile->total_time_counter());
-        if (UNLIKELY(!in_block || in_block->rows() == 0)) {
-            return Status::OK();
+        if (in_block->rows() > 0) {
+            return _sink->send(state, in_block, source_state == SourceState::FINISHED);
         }
-        return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+        return Status::OK();
     }
 
     Status close(RuntimeState* state) override {
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index b21ce4c28f..29056701b9 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -216,7 +216,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
     _fragment_cpu_timer = ADD_TIMER(profile(), "FragmentCpuTime");
 
     _row_batch.reset(new RowBatch(_plan->row_desc(), _runtime_state->batch_size()));
-    _block.reset(new doris::vectorized::Block());
     // _row_batch->tuple_data_pool()->set_limits(*_runtime_state->mem_trackers());
     VLOG_NOTICE << "plan_root=\n" << _plan->debug_string();
     _prepared = true;
@@ -289,17 +288,15 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
 
     {
         auto sink_send_span_guard = Defer {[this]() { this->_sink->end_send_span(); }};
-        while (true) {
-            doris::vectorized::Block* block;
+        doris::vectorized::Block block;
+        bool eos = false;
+
+        while (!eos) {
             RETURN_IF_CANCELLED(_runtime_state);
 
             {
                 SCOPED_CPU_TIMER(_fragment_cpu_timer);
-                RETURN_IF_ERROR(get_vectorized_internal(&block));
-            }
-
-            if (block == nullptr) {
-                break;
+                RETURN_IF_ERROR(get_vectorized_internal(&block, &eos));
             }
 
             SCOPED_TIMER(profile()->total_time_counter());
@@ -309,11 +306,13 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
                 _collect_query_statistics();
             }
 
-            auto st = _sink->send(runtime_state(), block);
-            if (st.is<END_OF_FILE>()) {
-                break;
+            if (!eos || block.rows() > 0) {
+                auto st = _sink->send(runtime_state(), &block);
+                if (st.is<END_OF_FILE>()) {
+                    break;
+                }
+                RETURN_IF_ERROR(st);
             }
-            RETURN_IF_ERROR(st);
         }
     }
 
@@ -338,27 +337,20 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
     return Status::OK();
 }
 
-Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block** block) {
-    if (_done) {
-        *block = nullptr;
-        return Status::OK();
-    }
-
+Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* block, bool* eos) {
     while (!_done) {
-        _block->clear_column_data(_plan->row_desc().num_materialized_slots());
+        block->clear_column_data(_plan->row_desc().num_materialized_slots());
         SCOPED_TIMER(profile()->total_time_counter());
         RETURN_IF_ERROR_AND_CHECK_SPAN(
-                _plan->get_next_after_projects(_runtime_state.get(), _block.get(), &_done),
+                _plan->get_next_after_projects(_runtime_state.get(), block, &_done),
                 _plan->get_next_span(), _done);
 
-        if (_block->rows() > 0) {
-            COUNTER_UPDATE(_rows_produced_counter, _block->rows());
-            *block = _block.get();
+        if (block->rows() > 0) {
+            COUNTER_UPDATE(_rows_produced_counter, block->rows());
             break;
         }
-
-        *block = nullptr;
     }
+    *eos = _done;
 
     return Status::OK();
 }
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index af08aff0e3..7abf9fdc89 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -188,7 +188,6 @@ private:
     // Created in prepare (if required), owned by this object.
     std::unique_ptr<DataSink> _sink;
     std::unique_ptr<RowBatch> _row_batch;
-    std::unique_ptr<doris::vectorized::Block> _block;
 
     // Number of rows returned by this fragment
     RuntimeProfile::Counter* _rows_produced_counter;
@@ -236,7 +235,7 @@ private:
 
     // Executes get_next() logic and returns resulting status.
     Status get_next_internal(RowBatch** batch);
-    Status get_vectorized_internal(::doris::vectorized::Block** block);
+    Status get_vectorized_internal(::doris::vectorized::Block* block, bool* eos);
 
     // Stops report thread, if one is running. Blocks until report thread terminates.
     // Idempotent.
diff --git a/be/src/util/mysql_load_error_hub.cpp b/be/src/util/mysql_load_error_hub.cpp
index daf484ef9c..a9943016e5 100644
--- a/be/src/util/mysql_load_error_hub.cpp
+++ b/be/src/util/mysql_load_error_hub.cpp
@@ -18,6 +18,7 @@
 #include <mysql/mysql.h>
 
 #define __DorisMysql MYSQL
+#include "common/logging.h"
 #include "mysql_load_error_hub.h"
 #include "util/defer_op.h"
 
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 3059d9499e..b4b49d7b61 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -194,7 +194,7 @@ public:
     static constexpr int PREFETCH_STEP = 64;
 
     HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-    ~HashJoinNode();
+    ~HashJoinNode() override;
 
     Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
     Status prepare(RuntimeState* state) override;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org