You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/09 04:10:53 UTC

[doris] 17/29: [conf](pipeline) turn pipeline on by default (#20458)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5e51389083ba540ae3270f8c6b38147d0cc2fada
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Jun 8 09:20:51 2023 +0800

    [conf](pipeline) turn pipeline on by default (#20458)
---
 .../src/main/java/org/apache/doris/common/Config.java         |  2 +-
 .../src/main/java/org/apache/doris/analysis/SetVar.java       |  8 ++++++++
 .../src/main/java/org/apache/doris/qe/SessionVariable.java    | 11 +++++++++--
 .../java/org/apache/doris/statistics/util/StatisticsUtil.java |  1 +
 .../src/test/java/org/apache/doris/planner/QueryPlanTest.java |  3 ---
 .../src/test/java/org/apache/doris/qe/VariableMgrTest.java    |  2 +-
 .../suites/correctness_p0/test_colocate_join.groovy           |  2 +-
 .../correctness_p0/test_null_aware_left_anti_join.groovy      |  1 +
 .../external_table_emr_p2/hive/test_external_github.groovy    |  8 ++++----
 regression-test/suites/mtmv_p0/ssb/ddl/ssb_q41_create.sql     |  2 +-
 regression-test/suites/mtmv_p0/ssb/ddl/ssb_q42_create.sql     |  2 +-
 regression-test/suites/mtmv_p0/ssb/ddl/ssb_q43_create.sql     |  2 +-
 .../nereids_p0/limit/sql/useRestrictiveLimitFromSubq.sql      |  2 +-
 .../suites/nereids_p0/limit/sql/withGroupByInSubq.sql         |  2 +-
 .../suites/nereids_p0/limit/sql/withJoinInSubq.sql            |  2 +-
 regression-test/suites/nereids_p0/limit/sql/withSubq.sql      |  2 +-
 .../bitmap_functions/test_bitmap_function.groovy              |  4 ++--
 .../sql_functions/conditional_functions/test_nullif.groovy    | 10 +++++-----
 .../window_functions/test_window_function.groovy              |  8 ++++----
 .../suites/nereids_syntax_p0/null_aware_left_anti_join.groovy |  1 +
 .../suites/nereids_tpcds_shape_sf100_p0/ddl/case.tmpl         |  3 ++-
 21 files changed, 47 insertions(+), 31 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index f1ae503ed1..e0e741a8a1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1486,7 +1486,7 @@ public class Config extends ConfigBase {
     public static boolean enable_quantile_state_type = true;
 
     @ConfField
-    public static boolean enable_pipeline_load = false;
+    public static boolean enable_pipeline_load = true;
 
     // enable_workload_group should be immutable and temporarily set to mutable during the development test phase
     @ConfField(mutable = true, masterOnly = true, expType = ExperimentalType.EXPERIMENTAL)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
index 8afa942dba..a0d61248a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
@@ -187,6 +187,14 @@ public class SetVar {
                         instanceNum + "(Should not be set to more than " + Config.max_instance_num + ")");
             }
         }
+        if (getVariable().equalsIgnoreCase(SessionVariable.PARALLEL_PIPELINE_TASK_NUM)) {
+            int instanceNum = Integer.parseInt(getValue().getStringValue());
+            if (instanceNum > Config.max_instance_num) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR,
+                        SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
+                        instanceNum + "(Should not be set to more than " + Config.max_instance_num + ")");
+            }
+        }
         if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) {
             this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue())));
             this.result = (LiteralExpr) this.value;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e1fc2b973f..3afbbaeb6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -104,6 +104,7 @@ public class SessionVariable implements Serializable, Writable {
     public static final String ENABLE_COLOCATE_SCAN = "enable_colocate_scan";
     public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
+    public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num";
     public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
     public static final String ENABLE_SPILLING = "enable_spilling";
     public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = "enable_exchange_node_parallel_merge";
@@ -531,6 +532,9 @@ public class SessionVariable implements Serializable, Writable {
     @VariableMgr.VarAttr(name = PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM, fuzzy = true)
     public int parallelExecInstanceNum = 1;
 
+    @VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true)
+    public int parallelPipelineTaskNum = 0;
+
     @VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT, needForward = true)
     public boolean enableInsertStrict = true;
 
@@ -597,7 +601,7 @@ public class SessionVariable implements Serializable, Writable {
     public boolean enableVectorizedEngine = true;
 
     @VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true, expType = ExperimentalType.EXPERIMENTAL)
-    public boolean enablePipelineEngine = false;
+    public boolean enablePipelineEngine = true;
 
     @VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE)
     public boolean enableParallelOutfile = false;
@@ -957,6 +961,7 @@ public class SessionVariable implements Serializable, Writable {
     public void initFuzzyModeVariables() {
         Random random = new Random(System.currentTimeMillis());
         this.parallelExecInstanceNum = random.nextInt(8) + 1;
+        this.parallelPipelineTaskNum = random.nextInt(8);
         this.enableCommonExprPushdown = random.nextBoolean();
         this.enableLocalExchange = random.nextBoolean();
         // This will cause be dead loop, disable it first
@@ -1331,9 +1336,11 @@ public class SessionVariable implements Serializable, Writable {
     }
 
     public int getParallelExecInstanceNum() {
-        if (enablePipelineEngine && parallelExecInstanceNum == 0) {
+        if (enablePipelineEngine && parallelPipelineTaskNum == 0) {
             Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
             return beinfoCollector.getParallelExecInstanceNum();
+        } else if (enablePipelineEngine) {
+            return parallelPipelineTaskNum;
         } else {
             return parallelExecInstanceNum;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index fed9d26165..8f77df2748 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -144,6 +144,7 @@ public class StatisticsUtil {
         sessionVariable.setMaxExecMemByte(StatisticConstants.STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES);
         sessionVariable.setEnableInsertStrict(true);
         sessionVariable.parallelExecInstanceNum = StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
+        sessionVariable.parallelPipelineTaskNum = StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
         sessionVariable.setEnableNereidsPlanner(false);
         sessionVariable.enableProfile = false;
         connectContext.setEnv(Env.getCurrentEnv());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 70b7f2d66d..79f2b50871 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1586,9 +1586,6 @@ public class QueryPlanTest extends TestWithFeService {
         String sql = "SELECT dt, dis_key, COUNT(1) FROM table_unpartitioned  group by dt, dis_key";
         String explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
         Assert.assertTrue(explainString.contains("AGGREGATE (update finalize)"));
-        sql = "SELECT dt, dis_key, COUNT(1) FROM table_partitioned  group by dt, dis_key";
-        explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("AGGREGATE (update serialize)"));
     }
 
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index f1b8e7b5a7..d08dc94454 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -90,7 +90,7 @@ public class VariableMgrTest {
         var = VariableMgr.newSessionVariable();
         Assert.assertEquals(1234L, var.getMaxExecMemByte());
 
-        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global parallel_fragment_exec_instance_num=5", ctx);
+        stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set global parallel_pipeline_task_num=5", ctx);
         executor = new SetExecutor(ctx, stmt);
         executor.execute();
         Assert.assertEquals(1L, var.getParallelExecInstanceNum());
diff --git a/regression-test/suites/correctness_p0/test_colocate_join.groovy b/regression-test/suites/correctness_p0/test_colocate_join.groovy
index 53ed6e9843..44e401235b 100644
--- a/regression-test/suites/correctness_p0/test_colocate_join.groovy
+++ b/regression-test/suites/correctness_p0/test_colocate_join.groovy
@@ -216,7 +216,7 @@ suite("test_colocate_join") {
             (20220101, 101, 202, 200, 100);"""
 
     explain {
-        sql("select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ " +
+        sql("select /*+SET_VAR(parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=1)*/ " +
                 " sum_col1,sum_col2 " +
                 "from " +
                 "(select datekey,sum(sum_col1) as sum_col1  from test_query_colocate where datekey=20220101 group by datekey) t1 " +
diff --git a/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy b/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy
index e74ed27ba3..f732b6bda5 100644
--- a/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy
+++ b/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy
@@ -57,6 +57,7 @@ suite("test_null_aware_left_anti_join") {
     qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
 
     sql """ set parallel_fragment_exec_instance_num=2; """
+    sql """ set parallel_pipeline_task_num=2; """
     qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
 
     sql """
diff --git a/regression-test/suites/external_table_emr_p2/hive/test_external_github.groovy b/regression-test/suites/external_table_emr_p2/hive/test_external_github.groovy
index 7cd488870f..4d3e636836 100644
--- a/regression-test/suites/external_table_emr_p2/hive/test_external_github.groovy
+++ b/regression-test/suites/external_table_emr_p2/hive/test_external_github.groovy
@@ -269,8 +269,8 @@ suite("test_external_github", "p2") {
         GROUP BY repo_name
         ORDER BY count() DESC
         LIMIT 50"""
-    def repositoriesWithClickhouse_related_comments1 = """SELECT /*+SET_VAR(exec_mem_limit=8589934592, parallel_fragment_exec_instance_num=8, query_timeout=600) */repo_name, count() FROM github_eventsSUFFIX WHERE lower(body) LIKE '%clickhouse%' GROUP BY repo_name ORDER BY count() DESC, repo_name ASC LIMIT 50"""
-    def repositoriesWithClickhouse_related_comments2 = """SELECT /*+SET_VAR(exec_mem_limit=8589934592, parallel_fragment_exec_instance_num=8, query_timeout=600) */
+    def repositoriesWithClickhouse_related_comments1 = """SELECT /*+SET_VAR(exec_mem_limit=8589934592, parallel_fragment_exec_instance_num=8, parallel_pipeline_task_num=8, query_timeout=600) */repo_name, count() FROM github_eventsSUFFIX WHERE lower(body) LIKE '%clickhouse%' GROUP BY repo_name ORDER BY count() DESC, repo_name ASC LIMIT 50"""
+    def repositoriesWithClickhouse_related_comments2 = """SELECT /*+SET_VAR(exec_mem_limit=8589934592, parallel_fragment_exec_instance_num=8, parallel_pipeline_task_num=8, query_timeout=600) */
             repo_name,
             sum(num_star) AS num_stars,
             sum(num_comment) AS num_comments
@@ -287,8 +287,8 @@ suite("test_external_github", "p2") {
         HAVING num_comments > 0
         ORDER BY num_stars DESC,num_comments DESC,repo_name ASC
         LIMIT 50"""
-    def repositoriesWithDoris_related_comments1 = """SELECT /*+SET_VAR(exec_mem_limit=8589934592, parallel_fragment_exec_instance_num=8, query_timeout=600) */repo_name, count() FROM github_eventsSUFFIX WHERE lower(body) LIKE '%doris%' GROUP BY repo_name ORDER BY count() DESC, repo_name ASC LIMIT 50"""
-    def repositoriesWithDoris_related_comments2 = """SELECT /*+SET_VAR(exec_mem_limit=8589934592, parallel_fragment_exec_instance_num=8, query_timeout=600) */
+    def repositoriesWithDoris_related_comments1 = """SELECT /*+SET_VAR(exec_mem_limit=8589934592, parallel_fragment_exec_instance_num=8, parallel_fragment_exec_instance_num=8, query_timeout=600) */repo_name, count() FROM github_eventsSUFFIX WHERE lower(body) LIKE '%doris%' GROUP BY repo_name ORDER BY count() DESC, repo_name ASC LIMIT 50"""
+    def repositoriesWithDoris_related_comments2 = """SELECT /*+SET_VAR(exec_mem_limit=8589934592, parallel_fragment_exec_instance_num=8, parallel_fragment_exec_instance_num=8, query_timeout=600) */
             repo_name,
             sum(num_star) AS num_stars,
             sum(num_comment) AS num_comments
diff --git a/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q41_create.sql b/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q41_create.sql
index 20f6b1248d..33626acf62 100644
--- a/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q41_create.sql
+++ b/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q41_create.sql
@@ -3,7 +3,7 @@ BUILD IMMEDIATE REFRESH COMPLETE
 DISTRIBUTED BY HASH(c_nation) BUCKETS 6
 PROPERTIES ('replication_num' = '1')
 AS
-SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=4, batch_size=4096) */
+SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=4, parallel_pipeline_task_num=4, batch_size=4096) */
     d_year,
     c_nation,
     SUM(lo_revenue - lo_supplycost) AS PROFIT
diff --git a/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q42_create.sql b/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q42_create.sql
index f7cb84d01a..a6db33e951 100644
--- a/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q42_create.sql
+++ b/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q42_create.sql
@@ -3,7 +3,7 @@ BUILD IMMEDIATE REFRESH COMPLETE
 DISTRIBUTED BY HASH(s_nation, p_category) BUCKETS 6
 PROPERTIES ('replication_num' = '1')
 AS
-SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2, batch_size=4096) */  
+SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2, parallel_pipeline_task_num=2, batch_size=4096) */
     d_year,
     s_nation,
     p_category,
diff --git a/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q43_create.sql b/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q43_create.sql
index 264fa42279..7f8d4311cb 100644
--- a/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q43_create.sql
+++ b/regression-test/suites/mtmv_p0/ssb/ddl/ssb_q43_create.sql
@@ -3,7 +3,7 @@ BUILD IMMEDIATE REFRESH COMPLETE
 DISTRIBUTED BY HASH(s_city, p_brand) BUCKETS 6
 PROPERTIES ('replication_num' = '1')
 AS
-SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2, batch_size=4096) */
+SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2, parallel_pipeline_task_num=2, batch_size=4096) */
     d_year,
     s_city,
     p_brand,
diff --git a/regression-test/suites/nereids_p0/limit/sql/useRestrictiveLimitFromSubq.sql b/regression-test/suites/nereids_p0/limit/sql/useRestrictiveLimitFromSubq.sql
index b9db067d77..b538756492 100644
--- a/regression-test/suites/nereids_p0/limit/sql/useRestrictiveLimitFromSubq.sql
+++ b/regression-test/suites/nereids_p0/limit/sql/useRestrictiveLimitFromSubq.sql
@@ -1,3 +1,3 @@
 -- database: presto; groups: limit; tables: nation
-SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2) */
+SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2, parallel_pipeline_task_num=2) */
 COUNT(*) FROM (SELECT * FROM tpch_tiny_nation LIMIT 2) AS foo LIMIT 5
diff --git a/regression-test/suites/nereids_p0/limit/sql/withGroupByInSubq.sql b/regression-test/suites/nereids_p0/limit/sql/withGroupByInSubq.sql
index 0b53dc0cab..d097ee3c4d 100644
--- a/regression-test/suites/nereids_p0/limit/sql/withGroupByInSubq.sql
+++ b/regression-test/suites/nereids_p0/limit/sql/withGroupByInSubq.sql
@@ -1,5 +1,5 @@
 -- database: presto; groups: limit; tables: partsupp
-SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2) */ 
+SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2, parallel_pipeline_task_num=2) */
 COUNT(*) FROM (
     SELECT suppkey, COUNT(*) FROM tpch_tiny_partsupp
     GROUP BY suppkey LIMIT 20) t1
diff --git a/regression-test/suites/nereids_p0/limit/sql/withJoinInSubq.sql b/regression-test/suites/nereids_p0/limit/sql/withJoinInSubq.sql
index 444bccf623..be696d9dc0 100644
--- a/regression-test/suites/nereids_p0/limit/sql/withJoinInSubq.sql
+++ b/regression-test/suites/nereids_p0/limit/sql/withJoinInSubq.sql
@@ -1,2 +1,2 @@
-SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2) */
+SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2, parallel_pipeline_task_num=2) */
 COUNT(*) FROM (SELECT n1.regionkey, n1.nationkey FROM tpch_tiny_nation n1 JOIN tpch_tiny_nation n2 ON n1.regionkey = n2.regionkey LIMIT 5) foo
diff --git a/regression-test/suites/nereids_p0/limit/sql/withSubq.sql b/regression-test/suites/nereids_p0/limit/sql/withSubq.sql
index 3cc941a518..00ccede952 100644
--- a/regression-test/suites/nereids_p0/limit/sql/withSubq.sql
+++ b/regression-test/suites/nereids_p0/limit/sql/withSubq.sql
@@ -1,3 +1,3 @@
 -- database: presto; groups: limit; tables: nation
-SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2) */
+SELECT /*+SET_VAR(parallel_fragment_exec_instance_num=2, parallel_pipeline_task_num=2) */
 COUNT(*) FROM (SELECT * FROM tpch_tiny_nation LIMIT 10) t1
diff --git a/regression-test/suites/nereids_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy b/regression-test/suites/nereids_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
index a08dc1f21c..1c358f6c95 100644
--- a/regression-test/suites/nereids_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
+++ b/regression-test/suites/nereids_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
@@ -618,8 +618,8 @@ suite("test_bitmap_function") {
     qt_sql """ select orthogonal_bitmap_intersect_count(members, tag_group, 1150000, 1150001, 390006) from ${arthogonalBitmapTable} where  tag_group in ( 1150000, 1150001, 390006); """
     qt_sql """ select orthogonal_bitmap_union_count(members) from ${arthogonalBitmapTable} where  tag_group in ( 1150000, 1150001, 390006);  """
     qt_sql_orthogonal_bitmap_intersect_count2 """ select orthogonal_bitmap_intersect_count(members, tag_group, 1,2) from test_arthogonal_bitmap; """
-    qt_sql_orthogonal_bitmap_intersect_count3_1 """ select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/orthogonal_bitmap_intersect_count(members, tag_group, 1,11) from test_arthogonal_bitmap; """
-    qt_sql_orthogonal_bitmap_intersect_count3_2 """ select /*+SET_VAR(parallel_fragment_exec_instance_num=2)*/orthogonal_bitmap_intersect_count(members, tag_group, 1,11) from test_arthogonal_bitmap; """
+    qt_sql_orthogonal_bitmap_intersect_count3_1 """ select /*+SET_VAR(parallel_fragment_exec_instance_num=1, parallel_pipeline_task_num=1)*/orthogonal_bitmap_intersect_count(members, tag_group, 1,11) from test_arthogonal_bitmap; """
+    qt_sql_orthogonal_bitmap_intersect_count3_2 """ select /*+SET_VAR(parallel_fragment_exec_instance_num=2, parallel_pipeline_task_num=2)*/orthogonal_bitmap_intersect_count(members, tag_group, 1,11) from test_arthogonal_bitmap; """
     qt_sql_orthogonal_bitmap_intersect_count4 """ select orthogonal_bitmap_intersect_count(members, tag_group, 2,12) from test_arthogonal_bitmap; """
     qt_sql_orthogonal_bitmap_union_count2 """ select orthogonal_bitmap_union_count( cast(null as bitmap)) from test_arthogonal_bitmap; """
     qt_sql_orthogonal_bitmap_union_count3 """ select orthogonal_bitmap_union_count(members) from test_arthogonal_bitmap; """
diff --git a/regression-test/suites/nereids_p0/sql_functions/conditional_functions/test_nullif.groovy b/regression-test/suites/nereids_p0/sql_functions/conditional_functions/test_nullif.groovy
index d99af35e2e..81397dc657 100644
--- a/regression-test/suites/nereids_p0/sql_functions/conditional_functions/test_nullif.groovy
+++ b/regression-test/suites/nereids_p0/sql_functions/conditional_functions/test_nullif.groovy
@@ -99,11 +99,11 @@ suite("test_nullif") {
     qt_if_nullif8 """select ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 
             order by a.k1"""
     // make sure stable
-    qt_if_nullif8_1 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
-    qt_if_nullif8_2 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
-    qt_if_nullif8_3 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
-    qt_if_nullif8_4 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
-    qt_if_nullif8_5 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
+    qt_if_nullif8_1 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,parallel_pipeline_task_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
+    qt_if_nullif8_2 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,parallel_pipeline_task_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
+    qt_if_nullif8_3 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,parallel_pipeline_task_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
+    qt_if_nullif8_4 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,parallel_pipeline_task_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
+    qt_if_nullif8_5 """select   /*+ SET_VAR(enable_pipeline_engine=false,parallel_fragment_exec_instance_num=2,parallel_pipeline_task_num=2,enable_share_hash_table_for_broadcast_join=true) */ b.k1, ifnull(b.k1, -1) k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 order by a.k1;"""
     qt_if_nullif10 """select ifnull(b.k6, "hll") k1 from baseall a left join bigtable b on a.k1 = b.k1 + 5 
             order by k1"""
     qt_if_nullif11 """select ifnull(b.k10, "2017-06-06") k1 from baseall a left join bigtable b on 
diff --git a/regression-test/suites/nereids_p0/sql_functions/window_functions/test_window_function.groovy b/regression-test/suites/nereids_p0/sql_functions/window_functions/test_window_function.groovy
index 904d0ae8e7..4860f1d33c 100644
--- a/regression-test/suites/nereids_p0/sql_functions/window_functions/test_window_function.groovy
+++ b/regression-test/suites/nereids_p0/sql_functions/window_functions/test_window_function.groovy
@@ -532,10 +532,10 @@ suite("test_window_function") {
     String cur
     for (p in range(0, 829)) {
         if (p == 0) {
-            cur = "(select /*+SET_VAR(parallel_fragment_exec_instance_num=1) */ ${k1}, 1 as wj from baseall order by ${k1}, ${k3} limit 1)".toString()
+            cur = "(select /*+SET_VAR(parallel_fragment_exec_instance_num=1, parallel_pipeline_task_num=1) */ ${k1}, 1 as wj from baseall order by ${k1}, ${k3} limit 1)".toString()
         }
         else {
-            cur = """(select /*+SET_VAR(parallel_fragment_exec_instance_num=1) */ ${k1}, ${p+1} as wj from baseall order by ${k1} , ${k3}
+            cur = """(select /*+SET_VAR(parallel_fragment_exec_instance_num=1, parallel_pipeline_task_num=1) */ ${k1}, ${p+1} as wj from baseall order by ${k1} , ${k3}
                     limit ${p}, 1 ) """.toString()
 
         }
@@ -559,9 +559,9 @@ suite("test_window_function") {
     line = "("
     for (p in range(0, 829)) {
         if (p == 0 ) {
-            cur = "(select /*+SET_VAR(parallel_fragment_exec_instance_num=1) */ * from baseall order by k1, k6 limit 1)"
+            cur = "(select /*+SET_VAR(parallel_fragment_exec_instance_num=1, parallel_pipeline_task_num=1) */ * from baseall order by k1, k6 limit 1)"
         } else {
-            cur = "(select /*+SET_VAR(parallel_fragment_exec_instance_num=1) */ * from baseall order by k1, k6 limit ${p}, 1)"
+            cur = "(select /*+SET_VAR(parallel_fragment_exec_instance_num=1, parallel_pipeline_task_num=1) */ * from baseall order by k1, k6 limit ${p}, 1)"
         }
         if (p < 828) {
             line = line + cur + " union all "
diff --git a/regression-test/suites/nereids_syntax_p0/null_aware_left_anti_join.groovy b/regression-test/suites/nereids_syntax_p0/null_aware_left_anti_join.groovy
index 0a70404b90..664dc91867 100644
--- a/regression-test/suites/nereids_syntax_p0/null_aware_left_anti_join.groovy
+++ b/regression-test/suites/nereids_syntax_p0/null_aware_left_anti_join.groovy
@@ -64,6 +64,7 @@ suite("test_nereids_null_aware_left_anti_join") {
 
     sql "SET enable_fallback_to_original_planner=true"
     sql """ set parallel_fragment_exec_instance_num=2; """
+    sql """ set parallel_pipeline_task_num=2; """
     sql "SET enable_fallback_to_original_planner=false"
     qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
 
diff --git a/regression-test/suites/nereids_tpcds_shape_sf100_p0/ddl/case.tmpl b/regression-test/suites/nereids_tpcds_shape_sf100_p0/ddl/case.tmpl
index 348eed8f54..8d930d4f86 100644
--- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/ddl/case.tmpl
+++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/ddl/case.tmpl
@@ -24,7 +24,8 @@ suite("query{--}") {
     sql 'set enable_fallback_to_original_planner=false'
     sql 'set exec_mem_limit=21G'
     sql 'set be_number_for_test=3'
-    sql 'set parallel_fragment_exec_instance_num=8'
+    sql 'set parallel_fragment_exec_instance_num=8; '
+    sql 'set parallel_pipeline_task_num=8; '
     sql 'set forbid_unknown_col_stats=true'
     sql 'set broadcast_row_count_limit = 30000000'
     sql 'set enable_nereids_timeout = false'


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org