You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2023/06/08 06:47:44 UTC
[doris] branch master updated: [Bug](pipeline) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data (#20557)
This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 22985af4d7 [Bug](pipeline) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data (#20557)
22985af4d7 is described below
commit 22985af4d702930d8d4b004d2fe7185832ed9960
Author: Pxl <px...@qq.com>
AuthorDate: Thu Jun 8 14:47:35 2023 +0800
[Bug](pipeline) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data (#20557)
set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data
---
be/src/pipeline/exec/data_queue.cpp | 22 +++++++++++-----------
be/src/pipeline/exec/data_queue.h | 10 +++++-----
be/src/pipeline/exec/union_source_operator.cpp | 14 +++++++++-----
be/src/pipeline/pipeline_fragment_context.cpp | 13 -------------
4 files changed, 25 insertions(+), 34 deletions(-)
diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp
index 6dbf341972..6ec6a5b2b1 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -29,17 +29,17 @@
namespace doris {
namespace pipeline {
-DataQueue::DataQueue(int child_count) {
- _child_count = child_count;
- _flag_queue_idx = 0;
- _queue_blocks.resize(child_count);
- _free_blocks.resize(child_count);
- _queue_blocks_lock.resize(child_count);
- _free_blocks_lock.resize(child_count);
- _is_finished.resize(child_count);
- _is_canceled.resize(child_count);
- _cur_bytes_in_queue.resize(child_count);
- _cur_blocks_nums_in_queue.resize(child_count);
+DataQueue::DataQueue(int child_count)
+ : _queue_blocks_lock(child_count),
+ _queue_blocks(child_count),
+ _free_blocks_lock(child_count),
+ _free_blocks(child_count),
+ _child_count(child_count),
+ _is_finished(child_count),
+ _is_canceled(child_count),
+ _cur_bytes_in_queue(child_count),
+ _cur_blocks_nums_in_queue(child_count),
+ _flag_queue_idx(0) {
for (int i = 0; i < child_count; ++i) {
_queue_blocks_lock[i].reset(new std::mutex());
_free_blocks_lock[i].reset(new std::mutex());
diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h
index 299f7e6b4d..e7f2fdfada 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -68,14 +68,14 @@ private:
//how many deque will be init, always will be one
int _child_count = 0;
- std::deque<std::atomic<bool>> _is_finished;
- std::deque<std::atomic<bool>> _is_canceled;
+ std::vector<std::atomic_bool> _is_finished;
+ std::vector<std::atomic_bool> _is_canceled;
// int64_t just for counter of profile
- std::deque<std::atomic<int64_t>> _cur_bytes_in_queue;
- std::deque<std::atomic<uint32_t>> _cur_blocks_nums_in_queue;
+ std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
+ std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
//this will be indicate which queue has data, it's useful when have many queues
- std::atomic<int> _flag_queue_idx = 0;
+ std::atomic_int _flag_queue_idx = 0;
// only used by streaming agg source operator
bool _data_exhausted = false;
diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp
index 83069f3e90..7311d849c8 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -83,11 +83,15 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl
std::bind(&UnionSourceOperator::pull_data, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3)));
//have exectue const expr, queue have no data any more, and child could be colsed
- source_state = ((!_need_read_for_const_expr && !_data_queue->remaining_has_data() &&
- _data_queue->is_all_finish()) ||
- eos)
- ? SourceState::FINISHED
- : SourceState::DEPEND_ON_SOURCE;
+ if (eos || (!_need_read_for_const_expr && !_data_queue->remaining_has_data() &&
+ _data_queue->is_all_finish())) {
+ source_state = SourceState::FINISHED;
+ } else if (_need_read_for_const_expr || _data_queue->remaining_has_data()) {
+ source_state = SourceState::MORE_DATA;
+ } else {
+ source_state = SourceState::DEPEND_ON_SOURCE;
+ }
+
return Status::OK();
}
} // namespace pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index ebb30fe384..827fa43a4e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -107,19 +107,6 @@
#include "vec/exec/vunion_node.h"
#include "vec/runtime/vdata_stream_mgr.h"
-namespace apache {
-namespace thrift {
-class TException;
-
-namespace transport {
-class TTransportException;
-} // namespace transport
-} // namespace thrift
-} // namespace apache
-
-using apache::thrift::transport::TTransportException;
-using apache::thrift::TException;
-
namespace doris::pipeline {
PipelineFragmentContext::PipelineFragmentContext(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org