You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/12/13 07:33:15 UTC
[doris] branch master updated: [Refactor](exec) refactor the code of datasink eos logic (#15009)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 284a3351f4 [Refactor](exec) refactor the code of datasink eos logic (#15009)
284a3351f4 is described below
commit 284a3351f46db081865e5487cc605d8457c7d9c6
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue Dec 13 15:33:08 2022 +0800
[Refactor](exec) refactor the code of datasink eos logic (#15009)
---
.../exec/nested_loop_join_probe_operator.h | 1 -
be/src/pipeline/exec/operator.h | 6 ++--
be/src/runtime/plan_fragment_executor.cpp | 42 +++++++++-------------
be/src/runtime/plan_fragment_executor.h | 3 +-
be/src/util/mysql_load_error_hub.cpp | 1 +
be/src/vec/exec/join/vhash_join_node.h | 2 +-
6 files changed, 23 insertions(+), 32 deletions(-)
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 4de351e441..bcbf08111c 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -1,4 +1,3 @@
-
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 46c79c8470..8196c8e1dc 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -278,10 +278,10 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
- if (UNLIKELY(!in_block || in_block->rows() == 0)) {
- return Status::OK();
+ if (in_block->rows() > 0) {
+ return _sink->send(state, in_block, source_state == SourceState::FINISHED);
}
- return _sink->send(state, in_block, source_state == SourceState::FINISHED);
+ return Status::OK();
}
Status close(RuntimeState* state) override {
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index b21ce4c28f..29056701b9 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -216,7 +216,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
_fragment_cpu_timer = ADD_TIMER(profile(), "FragmentCpuTime");
_row_batch.reset(new RowBatch(_plan->row_desc(), _runtime_state->batch_size()));
- _block.reset(new doris::vectorized::Block());
// _row_batch->tuple_data_pool()->set_limits(*_runtime_state->mem_trackers());
VLOG_NOTICE << "plan_root=\n" << _plan->debug_string();
_prepared = true;
@@ -289,17 +288,15 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
{
auto sink_send_span_guard = Defer {[this]() { this->_sink->end_send_span(); }};
- while (true) {
- doris::vectorized::Block* block;
+ doris::vectorized::Block block;
+ bool eos = false;
+
+ while (!eos) {
RETURN_IF_CANCELLED(_runtime_state);
{
SCOPED_CPU_TIMER(_fragment_cpu_timer);
- RETURN_IF_ERROR(get_vectorized_internal(&block));
- }
-
- if (block == nullptr) {
- break;
+ RETURN_IF_ERROR(get_vectorized_internal(&block, &eos));
}
SCOPED_TIMER(profile()->total_time_counter());
@@ -309,11 +306,13 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
_collect_query_statistics();
}
- auto st = _sink->send(runtime_state(), block);
- if (st.is<END_OF_FILE>()) {
- break;
+ if (!eos || block.rows() > 0) {
+ auto st = _sink->send(runtime_state(), &block);
+ if (st.is<END_OF_FILE>()) {
+ break;
+ }
+ RETURN_IF_ERROR(st);
}
- RETURN_IF_ERROR(st);
}
}
@@ -338,27 +337,20 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
return Status::OK();
}
-Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block** block) {
- if (_done) {
- *block = nullptr;
- return Status::OK();
- }
-
+Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* block, bool* eos) {
while (!_done) {
- _block->clear_column_data(_plan->row_desc().num_materialized_slots());
+ block->clear_column_data(_plan->row_desc().num_materialized_slots());
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR_AND_CHECK_SPAN(
- _plan->get_next_after_projects(_runtime_state.get(), _block.get(), &_done),
+ _plan->get_next_after_projects(_runtime_state.get(), block, &_done),
_plan->get_next_span(), _done);
- if (_block->rows() > 0) {
- COUNTER_UPDATE(_rows_produced_counter, _block->rows());
- *block = _block.get();
+ if (block->rows() > 0) {
+ COUNTER_UPDATE(_rows_produced_counter, block->rows());
break;
}
-
- *block = nullptr;
}
+ *eos = _done;
return Status::OK();
}
diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h
index af08aff0e3..7abf9fdc89 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -188,7 +188,6 @@ private:
// Created in prepare (if required), owned by this object.
std::unique_ptr<DataSink> _sink;
std::unique_ptr<RowBatch> _row_batch;
- std::unique_ptr<doris::vectorized::Block> _block;
// Number of rows returned by this fragment
RuntimeProfile::Counter* _rows_produced_counter;
@@ -236,7 +235,7 @@ private:
// Executes get_next() logic and returns resulting status.
Status get_next_internal(RowBatch** batch);
- Status get_vectorized_internal(::doris::vectorized::Block** block);
+ Status get_vectorized_internal(::doris::vectorized::Block* block, bool* eos);
// Stops report thread, if one is running. Blocks until report thread terminates.
// Idempotent.
diff --git a/be/src/util/mysql_load_error_hub.cpp b/be/src/util/mysql_load_error_hub.cpp
index daf484ef9c..a9943016e5 100644
--- a/be/src/util/mysql_load_error_hub.cpp
+++ b/be/src/util/mysql_load_error_hub.cpp
@@ -18,6 +18,7 @@
#include <mysql/mysql.h>
#define __DorisMysql MYSQL
+#include "common/logging.h"
#include "mysql_load_error_hub.h"
#include "util/defer_op.h"
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 3059d9499e..b4b49d7b61 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -194,7 +194,7 @@ public:
static constexpr int PREFETCH_STEP = 64;
HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
- ~HashJoinNode();
+ ~HashJoinNode() override;
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
Status prepare(RuntimeState* state) override;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org