You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2023/06/08 06:47:44 UTC

[doris] branch master updated: [Bug](pipeline) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data (#20557)

This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 22985af4d7 [Bug](pipeline) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data (#20557)
22985af4d7 is described below

commit 22985af4d702930d8d4b004d2fe7185832ed9960
Author: Pxl <px...@qq.com>
AuthorDate: Thu Jun 8 14:47:35 2023 +0800

    [Bug](pipeline) set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data (#20557)
    
    set SourceState to MORE_DATA when UnionSourceOperator have const_expr/data_queue->remaining_has_data
---
 be/src/pipeline/exec/data_queue.cpp            | 22 +++++++++++-----------
 be/src/pipeline/exec/data_queue.h              | 10 +++++-----
 be/src/pipeline/exec/union_source_operator.cpp | 14 +++++++++-----
 be/src/pipeline/pipeline_fragment_context.cpp  | 13 -------------
 4 files changed, 25 insertions(+), 34 deletions(-)

diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp
index 6dbf341972..6ec6a5b2b1 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -29,17 +29,17 @@
 namespace doris {
 namespace pipeline {
 
-DataQueue::DataQueue(int child_count) {
-    _child_count = child_count;
-    _flag_queue_idx = 0;
-    _queue_blocks.resize(child_count);
-    _free_blocks.resize(child_count);
-    _queue_blocks_lock.resize(child_count);
-    _free_blocks_lock.resize(child_count);
-    _is_finished.resize(child_count);
-    _is_canceled.resize(child_count);
-    _cur_bytes_in_queue.resize(child_count);
-    _cur_blocks_nums_in_queue.resize(child_count);
+DataQueue::DataQueue(int child_count)
+        : _queue_blocks_lock(child_count),
+          _queue_blocks(child_count),
+          _free_blocks_lock(child_count),
+          _free_blocks(child_count),
+          _child_count(child_count),
+          _is_finished(child_count),
+          _is_canceled(child_count),
+          _cur_bytes_in_queue(child_count),
+          _cur_blocks_nums_in_queue(child_count),
+          _flag_queue_idx(0) {
     for (int i = 0; i < child_count; ++i) {
         _queue_blocks_lock[i].reset(new std::mutex());
         _free_blocks_lock[i].reset(new std::mutex());
diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h
index 299f7e6b4d..e7f2fdfada 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -68,14 +68,14 @@ private:
 
     //how many deque will be init, always will be one
     int _child_count = 0;
-    std::deque<std::atomic<bool>> _is_finished;
-    std::deque<std::atomic<bool>> _is_canceled;
+    std::vector<std::atomic_bool> _is_finished;
+    std::vector<std::atomic_bool> _is_canceled;
     // int64_t just for counter of profile
-    std::deque<std::atomic<int64_t>> _cur_bytes_in_queue;
-    std::deque<std::atomic<uint32_t>> _cur_blocks_nums_in_queue;
+    std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
+    std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
 
     //this will be indicate which queue has data, it's useful when have many queues
-    std::atomic<int> _flag_queue_idx = 0;
+    std::atomic_int _flag_queue_idx = 0;
     // only used by streaming agg source operator
     bool _data_exhausted = false;
 
diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp
index 83069f3e90..7311d849c8 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -83,11 +83,15 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl
             std::bind(&UnionSourceOperator::pull_data, this, std::placeholders::_1,
                       std::placeholders::_2, std::placeholders::_3)));
     //have exectue const expr, queue have no data any more, and child could be colsed
-    source_state = ((!_need_read_for_const_expr && !_data_queue->remaining_has_data() &&
-                     _data_queue->is_all_finish()) ||
-                    eos)
-                           ? SourceState::FINISHED
-                           : SourceState::DEPEND_ON_SOURCE;
+    if (eos || (!_need_read_for_const_expr && !_data_queue->remaining_has_data() &&
+                _data_queue->is_all_finish())) {
+        source_state = SourceState::FINISHED;
+    } else if (_need_read_for_const_expr || _data_queue->remaining_has_data()) {
+        source_state = SourceState::MORE_DATA;
+    } else {
+        source_state = SourceState::DEPEND_ON_SOURCE;
+    }
+
     return Status::OK();
 }
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index ebb30fe384..827fa43a4e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -107,19 +107,6 @@
 #include "vec/exec/vunion_node.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
-namespace apache {
-namespace thrift {
-class TException;
-
-namespace transport {
-class TTransportException;
-} // namespace transport
-} // namespace thrift
-} // namespace apache
-
-using apache::thrift::transport::TTransportException;
-using apache::thrift::TException;
-
 namespace doris::pipeline {
 
 PipelineFragmentContext::PipelineFragmentContext(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org