You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "Mryange (via GitHub)" <gi...@apache.org> on 2023/11/12 12:57:47 UTC

[PR] [only test] default open pipelineX [doris]

Mryange opened a new pull request, #26840:
URL: https://github.com/apache/doris/pull/26840

   ## Proposed changes
   
   Issue Number: close #xxx
   
   <!--Describe your changes.-->
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [only test] default open pipelineX [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26840:
URL: https://github.com/apache/doris/pull/26840#issuecomment-1807199154

   
   <details>
   <summary>TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'</summary>
   
   ```
   Tpch sf100 test result on commit 7ad8a302e8247805829381737c2eeed58d3f1759, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	5310	5114	5073	5073
   q2	366	203	196	196
   q3	2077	2090	2068	2068
   q4	1483	1424	1428	1424
   q5	4121	4145	4083	4083
   q6	252	133	134	133
   q7	2058	1593	1610	1593
   q8	2767	2746	2759	2746
   q9	10446	10276	10274	10274
   q10	10268	3596	3551	3551
   q11	369	246	257	246
   q12	471	292	300	292
   q13	4534	4104	4162	4104
   q14	318	287	285	285
   q15	665	589	585	585
   q16	704	625	602	602
   q17	1138	1103	1097	1097
   q18	7892	7414	7378	7378
   q19	1692	1699	1718	1699
   q20	593	364	368	364
   q21	4920	4576	4562	4562
   q22	534	451	437	437
   Total cold run time: 62978 ms
   Total hot run time: 52792 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	5077	5042	4965	4965
   q2	344	231	268	231
   q3	4022	3979	3978	3978
   q4	2799	2766	2761	2761
   q5	6519	6532	6423	6423
   q6	244	129	126	126
   q7	3168	2651	2763	2651
   q8	4803	4778	4724	4724
   q9	17787	17688	17747	17688
   q10	4097	4154	4159	4154
   q11	729	637	645	637
   q12	1033	812	836	812
   q13	4339	3926	3905	3905
   q14	395	355	354	354
   q15	643	602	580	580
   q16	753	726	707	707
   q17	3896	3956	3916	3916
   q18	9497	9191	9413	9191
   q19	1767	1790	1776	1776
   q20	2412	2075	2054	2054
   q21	8988	8933	8838	8838
   q22	918	840	879	840
   Total cold run time: 84230 ms
   Total hot run time: 81311 ms
   ```
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [only test] default open pipelineX [doris]

Posted by "Mryange (via GitHub)" <gi...@apache.org>.
Mryange commented on PR #26840:
URL: https://github.com/apache/doris/pull/26840#issuecomment-1807120376

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [only test] default open pipelineX [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #26840:
URL: https://github.com/apache/doris/pull/26840#discussion_r1390406863


##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -87,10 +92,39 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block
     for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
         if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
             _sender_pos_to_read[i] = end;
+            _set_ready_for_read(i);
         }
     }
     _eos = eos;
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {

Review Comment:
   warning: method '_set_ready_for_read' can be made const [readability-make-member-function-const]
   
   ```suggestion
   void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) const {
   ```
   
   be/src/pipeline/exec/multi_cast_data_streamer.h:79:
   ```diff
   -     void _set_ready_for_read(int sender_idx);
   +     void _set_ready_for_read(int sender_idx) const;
   ```
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -87,10 +92,39 @@
     for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
         if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
             _sender_pos_to_read[i] = end;
+            _set_ready_for_read(i);
         }
     }
     _eos = eos;
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
+    if (!_has_dependencys) {
+        return;
+    }
+    auto* dep = _dependencys[sender_idx];
+    DCHECK(dep);
+    dep->set_ready_for_read();
+}
+
+void MultiCastDataStreamer::_set_ready_for_read() {

Review Comment:
   warning: method '_set_ready_for_read' can be made const [readability-make-member-function-const]
   
   ```suggestion
   void MultiCastDataStreamer::_set_ready_for_read() const {
   ```
   
   be/src/pipeline/exec/multi_cast_data_streamer.h:80:
   ```diff
   -     void _set_ready_for_read();
   +     void _set_ready_for_read() const;
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -899,7 +895,30 @@ struct LocalExchangeSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+    std::vector<Dependency*> source_dependencies;
     std::atomic<int> running_sink_operators = 0;
+    void add_running_sink_operators() { running_sink_operators++; }
+    void sub_running_sink_operators() {
+        running_sink_operators--;
+        if (running_sink_operators == 0) {
+            _set_ready_for_read();
+        }
+    }
+    void _set_ready_for_read() {
+        for (auto* dep : source_dependencies) {
+            DCHECK(dep);
+            dep->set_ready_for_read();
+        }
+    }
+    void set_dep_by_channel_id(Dependency* dep, int channel_id) {

Review Comment:
   warning: method 'set_dep_by_channel_id' can be made static [readability-convert-member-functions-to-static]
   
   ```suggestion
       static void set_dep_by_channel_id(Dependency* dep, int channel_id) {
   ```
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -87,10 +92,39 @@
     for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
         if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
             _sender_pos_to_read[i] = end;
+            _set_ready_for_read(i);
         }
     }
     _eos = eos;
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
+    if (!_has_dependencys) {
+        return;
+    }
+    auto* dep = _dependencys[sender_idx];
+    DCHECK(dep);
+    dep->set_ready_for_read();
+}
+
+void MultiCastDataStreamer::_set_ready_for_read() {
+    if (!_has_dependencys) {
+        return;
+    }
+    for (auto* dep : _dependencys) {
+        DCHECK(dep);
+        dep->set_ready_for_read();
+    }
+}
+
+void MultiCastDataStreamer::_block_reading(int sender_idx) {

Review Comment:
   warning: method '_block_reading' can be made const [readability-make-member-function-const]
   
   ```suggestion
   void MultiCastDataStreamer::_block_reading(int sender_idx) const {
   ```
   
   be/src/pipeline/exec/multi_cast_data_streamer.h:81:
   ```diff
   -     void _block_reading(int sender_idx);
   +     void _block_reading(int sender_idx) const;
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -899,7 +895,30 @@
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+    std::vector<Dependency*> source_dependencies;
     std::atomic<int> running_sink_operators = 0;
+    void add_running_sink_operators() { running_sink_operators++; }
+    void sub_running_sink_operators() {
+        running_sink_operators--;
+        if (running_sink_operators == 0) {
+            _set_ready_for_read();
+        }
+    }
+    void _set_ready_for_read() {
+        for (auto* dep : source_dependencies) {
+            DCHECK(dep);
+            dep->set_ready_for_read();
+        }
+    }
+    void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+        source_dependencies[channel_id] = dep;
+        dep->block_reading();
+    }
+    void _set_ready_for_read(int channel_id) {

Review Comment:
   warning: method '_set_ready_for_read' can be made static [readability-convert-member-functions-to-static]
   
   ```suggestion
       static void _set_ready_for_read(int channel_id) {
   ```
   



##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -70,56 +70,64 @@
     // must be call after all pipeline task is finish to release resource
     Status close(Status exec_status) override;
 
+    Dependency* read_blocked_dependency() {
+        for (auto* op_dep : _read_dependencies) {
+            _blocked_dep = op_dep->read_blocked_by();
+            if (_blocked_dep != nullptr) {
+                _blocked_dep->start_read_watcher();
+                return _blocked_dep;
+            }
+        }
+        return nullptr;
+    }
+
     bool source_can_read() override {
         if (_dry_run) {
             return true;
         }
-        for (auto* op_dep : _read_dependencies) {
-            auto* dep = op_dep->read_blocked_by();
-            if (dep != nullptr) {
-                dep->start_read_watcher();
-                push_blocked_task_to_dependency(dep);
-                return false;
-            }
+        return read_blocked_dependency() == nullptr;
+    }
+
+    Dependency* filter_blocked_dependency() {
+        _blocked_dep = _filter_dependency->filter_blocked_by();
+        if (_blocked_dep != nullptr) {
+            return _blocked_dep;
         }
-        return true;
+        return nullptr;
     }
 
     bool runtime_filters_are_ready_or_timeout() override {
-        auto* dep = _filter_dependency->filter_blocked_by();
-        if (dep != nullptr) {
-            push_blocked_task_to_dependency(dep);
-            return false;
-        }
-        return true;
+        return filter_blocked_dependency() == nullptr;
     }
 
-    bool sink_can_write() override {
-        auto* dep = _write_dependencies->write_blocked_by();
-        if (dep != nullptr) {
-            dep->start_write_watcher();
-            push_blocked_task_to_dependency(dep);
-            return false;
+    Dependency* write_blocked_dependency() {
+        _blocked_dep = _write_dependencies->write_blocked_by();
+        if (_blocked_dep != nullptr) {
+            static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
+            return _blocked_dep;
         }
-        return true;
+        return nullptr;
     }
 
+    bool sink_can_write() override { return write_blocked_dependency() == nullptr; }
+
     Status finalize() override;
 
     std::string debug_string() override;
 
-    bool is_pending_finish() override {
+    Dependency* finish_blocked_dependency() {

Review Comment:
   warning: method 'finish_blocked_dependency' can be made static [readability-convert-member-functions-to-static]
   
   ```suggestion
       static Dependency* finish_blocked_dependency() {
   ```
   



##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -70,56 +70,64 @@ class PipelineXTask : public PipelineTask {
     // must be call after all pipeline task is finish to release resource
     Status close(Status exec_status) override;
 
+    Dependency* read_blocked_dependency() {

Review Comment:
   warning: method 'read_blocked_dependency' can be made static [readability-convert-member-functions-to-static]
   
   ```suggestion
       static Dependency* read_blocked_dependency() {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [only test] default open pipelineX [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26840:
URL: https://github.com/apache/doris/pull/26840#issuecomment-1807201598

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 46.43 seconds
    stream load tsv:          551 seconds loaded 74807831229 Bytes, about 129 MB/s
    stream load json:         20 seconds loaded 2358488459 Bytes, about 112 MB/s
    stream load orc:          65 seconds loaded 1101869774 Bytes, about 16 MB/s
    stream load parquet:          32 seconds loaded 861443392 Bytes, about 25 MB/s
    insert into select:          29.1 seconds inserted 10000000 Rows, about 343K ops/s
    storage size: 17162135412 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [only test] default open pipelineX [doris]

Posted by "Mryange (via GitHub)" <gi...@apache.org>.
Mryange commented on PR #26840:
URL: https://github.com/apache/doris/pull/26840#issuecomment-1807144637

   run p0 10


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [only test] default open pipelineX [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26840:
URL: https://github.com/apache/doris/pull/26840#issuecomment-1807385419

   TeamCity be ut coverage result:
    Function Coverage: 36.75% (8407/22876) 
    Line Coverage: 29.27% (68169/232909)
    Region Coverage: 27.91% (35228/126234)
    Branch Coverage: 24.73% (18016/72848)
    Coverage Report: http://coverage.selectdb-in.cc/coverage/7ad8a302e8247805829381737c2eeed58d3f1759_7ad8a302e8247805829381737c2eeed58d3f1759/report/index.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [only test] default open pipelineX [doris]

Posted by "Mryange (via GitHub)" <gi...@apache.org>.
Mryange closed pull request #26840: [only test] default  open pipelineX
URL: https://github.com/apache/doris/pull/26840


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org