You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2024/04/21 05:19:15 UTC

(doris) branch master updated: [improvement](mtmv) Support union rewrite when the materialized view is not enough to provide all the data for the query (#33800)

This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 06c9fb66977 [improvement](mtmv) Support union rewrite when the materialized view is not enough to provide all the data for the query (#33800)
06c9fb66977 is described below

commit 06c9fb669775577b94aa045519749b36670def3c
Author: seawinde <14...@users.noreply.github.com>
AuthorDate: Sun Apr 21 13:19:07 2024 +0800

    [improvement](mtmv) Support union rewrite when the materialized view is not enough to provide all the data for the query (#33800)
    
    When the materialized view is not enough to provide all the data for the query, if the materialized view is increment update by partition. we can union materialized view and origin query to reponse the query.
    
    this depends on https://github.com/apache/doris/pull/33362
    
    such as materialized view def is as following:
    
    >         CREATE MATERIALIZED VIEW mv_10086
    >         BUILD IMMEDIATE REFRESH AUTO ON MANUAL
    >         partition by(l_shipdate)
    >         DISTRIBUTED BY RANDOM BUCKETS 2
    >         PROPERTIES ('replication_num' = '1')
    >         AS
    >     select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total
    >     from lineitem
    >     left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate
    >     group by
    >     l_shipdate,
    >     o_orderdate,
    >     l_partkey,
    >     l_suppkey;
    
    the materialized view data is as following:
    +------------+-------------+-----------+-----------+-----------+
    | l_shipdate | o_orderdate | l_partkey | l_suppkey | sum_total |
    +------------+-------------+-----------+-----------+-----------+
    | 2023-10-18 | 2023-10-18  |         2 |         3 |    109.20 |
    | 2023-10-17 | 2023-10-17  |         2 |         3 |     99.50 |
    | 2023-10-19 | 2023-10-19  |         2 |         3 |     99.50 |
    +------------+-------------+-----------+-----------+-----------+
    
    when we insert data to partition `2023-10-17`,  if we run query as following
    ```
        select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total
        from lineitem
        left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate
        group by
        l_shipdate,
        o_orderdate,
        l_partkey,
        l_suppkey;
    ```
    query rewrite by materialzied view will fail with message   `Check partition query used validation fail`
    if we turn on the switch `SET enable_materialized_view_union_rewrite = true;` default true
    we run the query above again, it will success and will use union all  materialized view and origin query to response the query correctly. the plan is as following:
    
    
    ```
    | Explain String(Nereids Planner)                                                                                                                                                                    |
    +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | PLAN FRAGMENT 0                                                                                                                                                                                    |
    |   OUTPUT EXPRS:                                                                                                                                                                                    |
    |     l_shipdate[#52]                                                                                                                                                                                |
    |     o_orderdate[#53]                                                                                                                                                                               |
    |     l_partkey[#54]                                                                                                                                                                                 |
    |     l_suppkey[#55]                                                                                                                                                                                 |
    |     sum_total[#56]                                                                                                                                                                                 |
    |   PARTITION: UNPARTITIONED                                                                                                                                                                         |
    |                                                                                                                                                                                                    |
    |   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    |   VRESULT SINK                                                                                                                                                                                     |
    |      MYSQL_PROTOCAL                                                                                                                                                                                |
    |                                                                                                                                                                                                    |
    |   11:VEXCHANGE                                                                                                                                                                                     |
    |      offset: 0                                                                                                                                                                                     |
    |      distribute expr lists:                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    | PLAN FRAGMENT 1                                                                                                                                                                                    |
    |                                                                                                                                                                                                    |
    |   PARTITION: HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45]                                                                                                   |
    |                                                                                                                                                                                                    |
    |   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    |   STREAM DATA SINK                                                                                                                                                                                 |
    |     EXCHANGE ID: 11                                                                                                                                                                                |
    |     UNPARTITIONED                                                                                                                                                                                  |
    |                                                                                                                                                                                                    |
    |   10:VUNION(756)                                                                                                                                                                                   |
    |   |                                                                                                                                                                                                |
    |   |----9:VAGGREGATE (merge finalize)(753)                                                                                                                                                          |
    |   |    |  output: sum(partial_sum(o_totalprice)[#46])[#51]                                                                                                                                         |
    |   |    |  group by: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45]                                                                                                              |
    |   |    |  cardinality=2                                                                                                                                                                            |
    |   |    |  distribute expr lists: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45]                                                                                                 |
    |   |    |                                                                                                                                                                                           |
    |   |    8:VEXCHANGE                                                                                                                                                                                 |
    |   |       offset: 0                                                                                                                                                                                |
    |   |       distribute expr lists: l_shipdate[#42]                                                                                                                                                   |
    |   |                                                                                                                                                                                                |
    |   1:VEXCHANGE                                                                                                                                                                                      |
    |      offset: 0                                                                                                                                                                                     |
    |      distribute expr lists:                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    | PLAN FRAGMENT 2                                                                                                                                                                                    |
    |                                                                                                                                                                                                    |
    |   PARTITION: HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25]                                                                                                                                   |
    |                                                                                                                                                                                                    |
    |   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    |   STREAM DATA SINK                                                                                                                                                                                 |
    |     EXCHANGE ID: 08                                                                                                                                                                                |
    |     HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45]                                                                                                            |
    |                                                                                                                                                                                                    |
    |   7:VAGGREGATE (update serialize)(747)                                                                                                                                                             |
    |   |  STREAMING                                                                                                                                                                                     |
    |   |  output: partial_sum(o_totalprice[#41])[#46]                                                                                                                                                   |
    |   |  group by: l_shipdate[#37], o_orderdate[#38], l_partkey[#39], l_suppkey[#40]                                                                                                                   |
    |   |  cardinality=2                                                                                                                                                                                 |
    |   |  distribute expr lists: l_shipdate[#37]                                                                                                                                                        |
    |   |                                                                                                                                                                                                |
    |   6:VHASH JOIN(741)                                                                                                                                                                                |
    |   |  join op: RIGHT OUTER JOIN(PARTITIONED)[]                                                                                                                                                      |
    |   |  equal join conjunct: (o_orderkey[#21] = l_orderkey[#5])                                                                                                                                       |
    |   |  equal join conjunct: (o_orderdate[#25] = l_shipdate[#15])                                                                                                                                     |
    |   |  runtime filters: RF000[min_max] <- l_orderkey[#5](2/2/2048), RF001[bloom] <- l_orderkey[#5](2/2/2048), RF002[min_max] <- l_shipdate[#15](1/1/2048), RF003[bloom] <- l_shipdate[#15](1/1/2048) |
    |   |  cardinality=2                                                                                                                                                                                 |
    |   |  vec output tuple id: 4                                                                                                                                                                        |
    |   |  output tuple id: 4                                                                                                                                                                            |
    |   |  vIntermediate tuple ids: 3                                                                                                                                                                    |
    |   |  hash output slot ids: 6 7 24 25 15                                                                                                                                                            |
    |   |  final projections: l_shipdate[#36], o_orderdate[#32], l_partkey[#34], l_suppkey[#35], o_totalprice[#31]                                                                                       |
    |   |  final project output tuple id: 4                                                                                                                                                              |
    |   |  distribute expr lists: o_orderkey[#21], o_orderdate[#25]                                                                                                                                      |
    |   |  distribute expr lists: l_orderkey[#5], l_shipdate[#15]                                                                                                                                        |
    |   |                                                                                                                                                                                                |
    |   |----3:VEXCHANGE                                                                                                                                                                                 |
    |   |       offset: 0                                                                                                                                                                                |
    |   |       distribute expr lists: l_orderkey[#5]                                                                                                                                                    |
    |   |                                                                                                                                                                                                |
    |   5:VEXCHANGE                                                                                                                                                                                      |
    |      offset: 0                                                                                                                                                                                     |
    |      distribute expr lists:                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    | PLAN FRAGMENT 3                                                                                                                                                                                    |
    |                                                                                                                                                                                                    |
    |   PARTITION: RANDOM                                                                                                                                                                                |
    |                                                                                                                                                                                                    |
    |   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    |   STREAM DATA SINK                                                                                                                                                                                 |
    |     EXCHANGE ID: 05                                                                                                                                                                                |
    |     HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25]                                                                                                                                            |
    |                                                                                                                                                                                                    |
    |   4:VOlapScanNode(722)                                                                                                                                                                             |
    |      TABLE: union_db.orders(orders), PREAGGREGATION: ON                                                                                                                                            |
    |      runtime filters: RF000[min_max] -> o_orderkey[#21], RF001[bloom] -> o_orderkey[#21], RF002[min_max] -> o_orderdate[#25], RF003[bloom] -> o_orderdate[#25]                                     |
    |      partitions=3/3 (p_20231017,p_20231018,p_20231019), tablets=9/9, tabletList=161188,161190,161192 ...                                                                                           |
    |      cardinality=3, avgRowSize=0.0, numNodes=1                                                                                                                                                     |
    |      pushAggOp=NONE                                                                                                                                                                                |
    |                                                                                                                                                                                                    |
    | PLAN FRAGMENT 4                                                                                                                                                                                    |
    |                                                                                                                                                                                                    |
    |   PARTITION: HASH_PARTITIONED: l_orderkey[#5]                                                                                                                                                      |
    |                                                                                                                                                                                                    |
    |   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    |   STREAM DATA SINK                                                                                                                                                                                 |
    |     EXCHANGE ID: 03                                                                                                                                                                                |
    |     HASH_PARTITIONED: l_orderkey[#5], l_shipdate[#15]                                                                                                                                              |
    |                                                                                                                                                                                                    |
    |   2:VOlapScanNode(729)                                                                                                                                                                             |
    |      TABLE: union_db.lineitem(lineitem), PREAGGREGATION: ON                                                                                                                                        |
    |      PREDICATES: (l_shipdate[#15] >= '2023-10-17') AND (l_shipdate[#15] < '2023-10-18')                                                                                                            |
    |      partitions=1/3 (p_20231017), tablets=3/3, tabletList=161223,161225,161227                                                                                                                     |
    |      cardinality=2, avgRowSize=0.0, numNodes=1                                                                                                                                                     |
    |      pushAggOp=NONE                                                                                                                                                                                |
    |                                                                                                                                                                                                    |
    | PLAN FRAGMENT 5                                                                                                                                                                                    |
    |                                                                                                                                                                                                    |
    |   PARTITION: RANDOM                                                                                                                                                                                |
    |                                                                                                                                                                                                    |
    |   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
    |                                                                                                                                                                                                    |
    |   STREAM DATA SINK                                                                                                                                                                                 |
    |     EXCHANGE ID: 01                                                                                                                                                                                |
    |     RANDOM                                                                                                                                                                                         |
    |                                                                                                                                                                                                    |
    |   0:VOlapScanNode(718)                                                                                                                                                                             |
    |      TABLE: union_db.mv_10086(mv_10086), PREAGGREGATION: ON                                                                                                                                        |
    |      partitions=2/3 (p_20231018_20231019,p_20231019_20231020), tablets=4/4, tabletList=161251,161253,161265 ...                                                                                    |
    |      cardinality=2, avgRowSize=0.0, numNodes=1                                                                                                                                                     |
    |      pushAggOp=NONE                                                                                                                                                                                |
    |                                                                                                                                                                                                    |
    | MaterializedView                                                                                                                                                                                   |
    | MaterializedViewRewriteSuccessAndChose:                                                                                                                                                            |
    |   Names: mv_10086                                                                                                                                                                                  |
    | MaterializedViewRewriteSuccessButNotChose:                                                                                                                                                         |
    |                                                                                                                                                                                                    |
    | MaterializedViewRewriteFail:                                                                                                                                                                       |
    +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    ```
---
 .../main/java/org/apache/doris/mtmv/MTMVCache.java |  48 +-
 .../processor/post/CommonSubExpressionOpt.java     |   5 +-
 .../mv/AbstractMaterializedViewRule.java           | 218 ++++--
 .../exploration/mv/MaterializedViewUtils.java      |  64 +-
 .../nereids/rules/exploration/mv/Predicates.java   |  52 ++
 .../nereids/rules/exploration/mv/StructInfo.java   |  70 ++
 .../doris/nereids/rules/rewrite/EliminateSort.java |   4 +-
 .../plans/commands/UpdateMvByPartitionCommand.java |  32 +-
 .../plans/visitor/ExpressionLineageReplacer.java   |   6 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  14 +
 .../nereids_rules_p0/mv/partition_mv_rewrite.out   |  29 +
 .../mv/dimension/dimension_self_conn.groovy        | 564 ++++++++++++++
 .../mv/nested_mtmv/nested_mtmv.groovy              | 859 +++++++++++++++++++++
 .../mv/partition_mv_rewrite.groovy                 |  17 +
 .../partition_curd_union_rewrite.groovy}           |  85 +-
 .../mv/union_rewrite/usercase_union_rewrite.groovy | 174 +++++
 16 files changed, 2117 insertions(+), 124 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
index 9116817834d..154bd4ec7f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
@@ -17,27 +17,25 @@
 
 package org.apache.doris.mtmv;
 
-import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.Partition;
+import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.StatementContext;
-import org.apache.doris.nereids.analyzer.UnboundResultSink;
-import org.apache.doris.nereids.analyzer.UnboundTableSink;
+import org.apache.doris.nereids.jobs.executor.Rewriter;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
+import org.apache.doris.nereids.rules.rewrite.EliminateSort;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
-import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 
-import com.google.common.collect.Lists;
-
-import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
 
 /**
  * The cache for materialized view cache
@@ -70,27 +68,25 @@ public class MTMVCache {
         if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
             mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
         }
-        unboundMvPlan = unboundMvPlan.accept(new DefaultPlanVisitor<LogicalPlan, Void>() {
-            // convert to table sink to eliminate sort under table sink, because sort under result sink can not be
-            // eliminated
-            @Override
-            public LogicalPlan visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink,
-                    Void context) {
-                return new UnboundTableSink<>(mtmv.getFullQualifiers(),
-                        mtmv.getBaseSchema().stream().map(Column::getName).collect(Collectors.toList()),
-                        Lists.newArrayList(),
-                        mtmv.getPartitions().stream().map(Partition::getName).collect(Collectors.toList()),
-                        unboundResultSink.child());
-            }
-        }, null);
+        // Can not convert to table sink, because use the same column from different table when self join
+        // the out slot is wrong
         Plan originPlan = planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
-        // eliminate logicalTableSink because sink operator is useless in query rewrite by materialized view
-        Plan mvPlan = planner.getCascadesContext().getRewritePlan().accept(new DefaultPlanRewriter<Object>() {
+        // Eliminate result sink because sink operator is useless in query rewrite by materialized view
+        // and the top sort can also be removed
+        Plan mvPlan = originPlan.accept(new DefaultPlanRewriter<Object>() {
             @Override
-            public Plan visitLogicalTableSink(LogicalTableSink<? extends Plan> logicalTableSink, Object context) {
-                return logicalTableSink.child().accept(this, context);
+            public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, Object context) {
+                return logicalResultSink.child().accept(this, context);
             }
         }, null);
+        // Optimize by rules to remove top sort
+        CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan,
+                PhysicalProperties.ANY);
+        mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> {
+            Rewriter.getCteChildrenRewriter(childContext,
+                    ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
+            return childContext.getRewritePlan();
+        }, mvPlan, originPlan);
         return new MTMVCache(mvPlan, originPlan);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java
index 44e65dee1c9..8f558aaba02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java
@@ -83,8 +83,9 @@ public class CommonSubExpressionOpt extends PlanPostProcessor {
                         // 'case slot whenClause2 END'
                         // This is illegal.
                         Expression rewritten = expr.accept(ExpressionReplacer.INSTANCE, aliasMap);
-                        Alias alias = new Alias(rewritten);
-                        aliasMap.put(expr, alias);
+                        // if rewritten is already alias, use it directly, because in materialized view rewriting
+                        // Should keep out slot immutably after rewritten successfully
+                        aliasMap.put(expr, rewritten instanceof Alias ? (Alias) rewritten : new Alias(rewritten));
                     }
                 });
                 layer.addAll(aliasMap.values());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index b596408851f..6bf47f00c35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -17,27 +17,33 @@
 
 package org.apache.doris.nereids.rules.exploration.mv;
 
+import org.apache.doris.analysis.PartitionKeyDesc;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVPartitionInfo;
 import org.apache.doris.mtmv.MTMVRewriteUtil;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.jobs.executor.Rewriter;
+import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
 import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
+import org.apache.doris.nereids.rules.exploration.mv.StructInfo.InvalidPartitionRemover;
+import org.apache.doris.nereids.rules.exploration.mv.StructInfo.QueryScanPartitionsCollector;
 import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
 import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
 import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
 import org.apache.doris.nereids.trees.expressions.Alias;
-import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Not;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.NonNullable;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable;
@@ -46,20 +52,26 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
+import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.TypeUtils;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -129,20 +141,22 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
      */
     protected List<StructInfo> getValidQueryStructInfos(Plan queryPlan, CascadesContext cascadesContext,
             BitSet materializedViewTableSet) {
-        return MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext, materializedViewTableSet)
-                .stream()
-                .filter(queryStructInfo -> {
-                    boolean valid = checkPattern(queryStructInfo);
-                    if (!valid) {
-                        cascadesContext.getMaterializationContexts().forEach(ctx ->
-                                ctx.recordFailReason(queryStructInfo, "Query struct info is invalid",
-                                        () -> String.format("query table bitmap is %s, plan is %s",
-                                                queryStructInfo.getTableBitSet(), queryPlan.treeString())
-                                ));
-                    }
-                    return valid;
-                })
-                .collect(Collectors.toList());
+        List<StructInfo> validStructInfos = new ArrayList<>();
+        List<StructInfo> uncheckedStructInfos = MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext,
+                materializedViewTableSet);
+        uncheckedStructInfos.forEach(queryStructInfo -> {
+            boolean valid = checkPattern(queryStructInfo);
+            if (!valid) {
+                cascadesContext.getMaterializationContexts().forEach(ctx ->
+                        ctx.recordFailReason(queryStructInfo, "Query struct info is invalid",
+                                () -> String.format("query table bitmap is %s, plan is %s",
+                                        queryStructInfo.getTableBitSet(), queryPlan.treeString())
+                        ));
+            } else {
+                validStructInfos.add(queryStructInfo);
+            }
+        });
+        return validStructInfos;
     }
 
     /**
@@ -200,13 +214,13 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
             }
             Plan rewrittenPlan;
             Plan mvScan = materializationContext.getMvScanPlan();
-            Plan topPlan = queryStructInfo.getTopPlan();
+            Plan queryPlan = queryStructInfo.getTopPlan();
             if (compensatePredicates.isAlwaysTrue()) {
                 rewrittenPlan = mvScan;
             } else {
                 // Try to rewrite compensate predicates by using mv scan
                 List<Expression> rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(),
-                        topPlan, materializationContext.getMvExprToMvScanExprMapping(),
+                        queryPlan, materializationContext.getMvExprToMvScanExprMapping(),
                         viewToQuerySlotMapping, true, queryStructInfo.getTableBitSet());
                 if (rewriteCompensatePredicates.isEmpty()) {
                     materializationContext.recordFailReason(queryStructInfo,
@@ -225,65 +239,125 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
             if (rewrittenPlan == null) {
                 continue;
             }
-            final Plan finalRewrittenPlan = rewriteByRules(cascadesContext, rewrittenPlan, topPlan);
-            if (!isOutputValid(topPlan, finalRewrittenPlan)) {
-                materializationContext.recordFailReason(queryStructInfo,
-                        "RewrittenPlan output logical properties is different with target group",
-                        () -> String.format("planOutput logical"
-                                        + " properties = %s,\n groupOutput logical properties = %s",
-                                finalRewrittenPlan.getLogicalProperties(), topPlan.getLogicalProperties()));
-                continue;
-            }
+            rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
+                    childContext -> {
+                        Rewriter.getWholeTreeRewriter(childContext).execute();
+                        return childContext.getRewritePlan();
+                    }, rewrittenPlan, queryPlan);
             // check the partitions used by rewritten plan is valid or not
-            Set<Long> invalidPartitionsQueryUsed =
-                    calcInvalidPartitions(finalRewrittenPlan, materializationContext, cascadesContext);
-            if (!invalidPartitionsQueryUsed.isEmpty()) {
+            Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> invalidPartitionsQueryUsed =
+                    calcUsedInvalidMvPartitions(rewrittenPlan, materializationContext, cascadesContext);
+            // All partition used by query is valid
+            if (!invalidPartitionsQueryUsed.isEmpty() && !cascadesContext.getConnectContext().getSessionVariable()
+                    .isEnableMaterializedViewUnionRewrite()) {
                 materializationContext.recordFailReason(queryStructInfo,
                         "Check partition query used validation fail",
                         () -> String.format("the partition used by query is invalid by materialized view,"
                                         + "invalid partition info query used is %s",
-                                materializationContext.getMTMV().getPartitions().stream()
-                                        .filter(partition ->
-                                                invalidPartitionsQueryUsed.contains(partition.getId()))
+                                invalidPartitionsQueryUsed.values().stream()
+                                        .map(Partition::getName)
                                         .collect(Collectors.toSet())));
                 continue;
             }
+            boolean partitionValid = invalidPartitionsQueryUsed.isEmpty();
+            if (checkCanUnionRewrite(invalidPartitionsQueryUsed, queryPlan, cascadesContext)) {
+                // construct filter on originalPlan
+                Map<TableIf, Set<Expression>> filterOnOriginPlan;
+                try {
+                    filterOnOriginPlan = Predicates.constructFilterByPartitions(invalidPartitionsQueryUsed,
+                            queryToViewSlotMapping);
+                    if (filterOnOriginPlan.isEmpty()) {
+                        materializationContext.recordFailReason(queryStructInfo,
+                                "construct invalid partition filter on query fail",
+                                () -> String.format("the invalid partitions used by query is %s, query plan is %s",
+                                        invalidPartitionsQueryUsed.values().stream().map(Partition::getName)
+                                                .collect(Collectors.toSet()),
+                                        queryStructInfo.getOriginalPlan().treeString()));
+                        continue;
+                    }
+                } catch (org.apache.doris.common.AnalysisException e) {
+                    materializationContext.recordFailReason(queryStructInfo,
+                            "construct invalid partition filter on query analysis fail",
+                            () -> String.format("the invalid partitions used by query is %s, query plan is %s",
+                                    invalidPartitionsQueryUsed.values().stream().map(Partition::getName)
+                                            .collect(Collectors.toSet()),
+                                    queryStructInfo.getOriginalPlan().treeString()));
+                    continue;
+                }
+                // For rewrittenPlan which contains materialized view should remove invalid partition ids
+                List<Plan> children = Lists.newArrayList(
+                        rewrittenPlan.accept(new InvalidPartitionRemover(), Pair.of(materializationContext.getMTMV(),
+                                invalidPartitionsQueryUsed.values().stream()
+                                        .map(Partition::getId).collect(Collectors.toSet()))),
+                        StructInfo.addFilterOnTableScan(queryPlan, filterOnOriginPlan, cascadesContext));
+                // Union query materialized view and source table
+                rewrittenPlan = new LogicalUnion(Qualifier.ALL,
+                        queryPlan.getOutput().stream().map(NamedExpression.class::cast).collect(Collectors.toList()),
+                        children.stream()
+                                .map(plan -> plan.getOutput().stream()
+                                        .map(slot -> (SlotReference) slot.toSlot()).collect(Collectors.toList()))
+                                .collect(Collectors.toList()),
+                        ImmutableList.of(),
+                        false,
+                        children);
+                partitionValid = true;
+            }
+            if (!partitionValid) {
+                materializationContext.recordFailReason(queryStructInfo,
+                        "materialized view partition is invalid union fail",
+                        () -> String.format("invalidPartitionsQueryUsed =  %s,\n query plan = %s",
+                                invalidPartitionsQueryUsed, queryPlan.treeString()));
+                continue;
+            }
+            rewrittenPlan = normalizeExpressions(rewrittenPlan, queryPlan);
+            if (!isOutputValid(queryPlan, rewrittenPlan)) {
+                LogicalProperties logicalProperties = rewrittenPlan.getLogicalProperties();
+                materializationContext.recordFailReason(queryStructInfo,
+                        "RewrittenPlan output logical properties is different with target group",
+                        () -> String.format("planOutput logical"
+                                        + " properties = %s,\n groupOutput logical properties = %s",
+                                logicalProperties, queryPlan.getLogicalProperties()));
+                continue;
+            }
             recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext);
-            rewriteResults.add(finalRewrittenPlan);
+            rewriteResults.add(rewrittenPlan);
         }
         return rewriteResults;
     }
 
-    /**
-     * Rewrite by rules and try to make output is the same after optimize by rules
-     */
-    protected Plan rewriteByRules(CascadesContext cascadesContext, Plan rewrittenPlan, Plan originPlan) {
-        List<Slot> originOutputs = originPlan.getOutput();
-        if (originOutputs.size() != rewrittenPlan.getOutput().size()) {
-            return null;
-        }
-        Map<Slot, ExprId> originSlotToRewrittenExprId = Maps.newLinkedHashMap();
-        for (int i = 0; i < originOutputs.size(); i++) {
-            originSlotToRewrittenExprId.put(originOutputs.get(i), rewrittenPlan.getOutput().get(i).getExprId());
+    private boolean checkCanUnionRewrite(Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition>
+            invalidPartitionsQueryUsed, Plan queryPlan, CascadesContext cascadesContext) {
+        if (invalidPartitionsQueryUsed.isEmpty()
+                || !cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewUnionRewrite()) {
+            return false;
         }
-        // run rbo job on mv rewritten plan
-        CascadesContext rewrittenPlanContext = CascadesContext.initContext(
-                cascadesContext.getStatementContext(), rewrittenPlan,
-                cascadesContext.getCurrentJobContext().getRequiredProperties());
-        Rewriter.getWholeTreeRewriter(rewrittenPlanContext).execute();
-        rewrittenPlan = rewrittenPlanContext.getRewritePlan();
-
-        // for get right nullable after rewritten, we need this map
-        Map<ExprId, Slot> exprIdToNewRewrittenSlot = Maps.newLinkedHashMap();
-        for (Slot slot : rewrittenPlan.getOutput()) {
-            exprIdToNewRewrittenSlot.put(slot.getExprId(), slot);
+        // if mv can not offer valid partition data for query, bail out union rewrite
+        Map<Long, Set<PartitionItem>> mvRelatedTablePartitionMap = new LinkedHashMap<>();
+        invalidPartitionsQueryUsed.keySet().forEach(invalidPartition ->
+                mvRelatedTablePartitionMap.put(invalidPartition.key().getRelatedTableInfo().getTableId(),
+                        new HashSet<>()));
+        queryPlan.accept(new QueryScanPartitionsCollector(), mvRelatedTablePartitionMap);
+        Set<PartitionKeyDesc> partitionKeyDescSetQueryUsed = mvRelatedTablePartitionMap.values().stream()
+                .flatMap(Collection::stream)
+                .map(PartitionItem::toPartitionKeyDesc)
+                .collect(Collectors.toSet());
+        Set<PartitionKeyDesc> mvInvalidPartitionKeyDescSet = new HashSet<>();
+        for (Map.Entry<Pair<MTMVPartitionInfo, PartitionInfo>, Collection<Partition>> entry :
+                invalidPartitionsQueryUsed.asMap().entrySet()) {
+            entry.getValue().forEach(invalidPartition -> mvInvalidPartitionKeyDescSet.add(
+                    entry.getKey().value().getItem(invalidPartition.getId()).toPartitionKeyDesc()));
         }
+        return !mvInvalidPartitionKeyDescSet.containsAll(partitionKeyDescSetQueryUsed);
+    }
 
+    // Normalize expression such as nullable property and output slot id
+    protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) {
         // normalize nullable
-        ImmutableList<NamedExpression> convertNullable = originOutputs.stream()
-                .map(s -> normalizeExpression(s, exprIdToNewRewrittenSlot.get(originSlotToRewrittenExprId.get(s))))
-                .collect(ImmutableList.toImmutableList());
-        return new LogicalProject<>(convertNullable, rewrittenPlan);
+        List<NamedExpression> normalizeProjects = new ArrayList<>();
+        for (int i = 0; i < originPlan.getOutput().size(); i++) {
+            normalizeProjects.add(normalizeExpression(originPlan.getOutput().get(i), rewrittenPlan.getOutput().get(i)));
+        }
+        return new LogicalProject<>(normalizeProjects, rewrittenPlan);
     }
 
     /**
@@ -303,35 +377,47 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
      * catalog relation.
      * Maybe only just some partitions is valid in materialized view, so we should check if the mv can
      * offer the partitions which query used or not.
+     *
+     * @return the invalid partition name set
      */
-    protected Set<Long> calcInvalidPartitions(Plan rewrittenPlan, MaterializationContext materializationContext,
+    protected Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> calcUsedInvalidMvPartitions(
+            Plan rewrittenPlan,
+            MaterializationContext materializationContext,
             CascadesContext cascadesContext) {
         // check partition is valid or not
         MTMV mtmv = materializationContext.getMTMV();
         PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo();
         if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) {
             // if not partition, if rewrite success, it means mv is available
-            return ImmutableSet.of();
+            return ImmutableMultimap.of();
         }
         // check mv related table partition is valid or not
         MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
         BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo();
         if (relatedPartitionTable == null) {
-            return ImmutableSet.of();
+            return ImmutableMultimap.of();
         }
         // get mv valid partitions
         Set<Long> mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
                         cascadesContext.getConnectContext(), System.currentTimeMillis()).stream()
                 .map(Partition::getId)
                 .collect(Collectors.toSet());
-        Set<Long> queryUsedPartitionIdSet = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan
+        // get partitions query used
+        Set<Long> mvPartitionSetQueryUsed = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan
                         && Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName()))
                 .stream()
                 .map(node -> ((LogicalOlapScan) node).getSelectedPartitionIds())
                 .flatMap(Collection::stream)
                 .collect(Collectors.toSet());
-        queryUsedPartitionIdSet.removeAll(mvDataValidPartitionIdSet);
-        return queryUsedPartitionIdSet;
+        // get invalid partition ids
+        Set<Long> invalidMvPartitionIdSet = new HashSet<>(mvPartitionSetQueryUsed);
+        invalidMvPartitionIdSet.removeAll(mvDataValidPartitionIdSet);
+        ImmutableMultimap.Builder<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> invalidPartitionMapBuilder =
+                ImmutableMultimap.builder();
+        Pair<MTMVPartitionInfo, PartitionInfo> partitionInfo = Pair.of(mvCustomPartitionInfo, mvPartitionInfo);
+        invalidMvPartitionIdSet.forEach(invalidPartitionId ->
+                        invalidPartitionMapBuilder.put(partitionInfo, mtmv.getPartition(invalidPartitionId)));
+        return invalidPartitionMapBuilder.build();
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
index ac4b9ccad38..46d0adde06e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
@@ -27,6 +27,7 @@ import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.StructInfoMap;
+import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
@@ -49,15 +50,17 @@ import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -147,13 +150,19 @@ public class MaterializedViewUtils {
             StructInfoMap structInfoMap = ownerGroup.getstructInfoMap();
             structInfoMap.refresh(ownerGroup);
             Set<BitSet> queryTableSets = structInfoMap.getTableMaps();
+            ImmutableList.Builder<StructInfo> structInfosBuilder = ImmutableList.builder();
             if (!queryTableSets.isEmpty()) {
-                return queryTableSets.stream()
-                        // Just construct the struct info which mv table set contains all the query table set
-                        .filter(queryTableSet -> materializedViewTableSet.isEmpty()
-                                || StructInfo.containsAll(materializedViewTableSet, queryTableSet))
-                        .map(tableMap -> structInfoMap.getStructInfo(tableMap, tableMap, ownerGroup, plan))
-                        .collect(Collectors.toList());
+                for (BitSet queryTableSet : queryTableSets) {
+                    if (!materializedViewTableSet.isEmpty()
+                            && !StructInfo.containsAll(materializedViewTableSet, queryTableSet)) {
+                        continue;
+                    }
+                    StructInfo structInfo = structInfoMap.getStructInfo(queryTableSet, queryTableSet, ownerGroup, plan);
+                    if (structInfo != null) {
+                        structInfosBuilder.add(structInfo);
+                    }
+                }
+                return structInfosBuilder.build();
             }
         }
         // if plan doesn't belong to any group, construct it directly
@@ -172,8 +181,8 @@ public class MaterializedViewUtils {
                 materializedView,
                 ImmutableList.of(materializedView.getQualifiedDbName()),
                 // this must be empty, or it will be used to sample
-                Lists.newArrayList(),
-                Lists.newArrayList(),
+                ImmutableList.of(),
+                ImmutableList.of(),
                 Optional.empty());
         mvScan = mvScan.withMaterializedIndexSelected(PreAggStatus.on(), materializedView.getBaseIndexId());
         List<NamedExpression> mvProjects = mvScan.getOutput().stream().map(NamedExpression.class::cast)
@@ -181,6 +190,43 @@ public class MaterializedViewUtils {
         return new LogicalProject<Plan>(mvProjects, mvScan);
     }
 
+    /**
+     * Optimize by rules, this support optimize by custom rules by define different rewriter according to different
+     * rules
+     */
+    public static Plan rewriteByRules(
+            CascadesContext cascadesContext,
+            Function<CascadesContext, Plan> planRewriter,
+            Plan rewrittenPlan, Plan originPlan) {
+        List<Slot> originOutputs = originPlan.getOutput();
+        if (originOutputs.size() != rewrittenPlan.getOutput().size()) {
+            return null;
+        }
+        // After RBO, slot order may change, so need originSlotToRewrittenExprId which record
+        // origin plan slot order
+        List<ExprId> originalRewrittenPlanExprIds =
+                rewrittenPlan.getOutput().stream().map(Slot::getExprId).collect(Collectors.toList());
+        // run rbo job on mv rewritten plan
+        CascadesContext rewrittenPlanContext = CascadesContext.initContext(
+                cascadesContext.getStatementContext(), rewrittenPlan,
+                cascadesContext.getCurrentJobContext().getRequiredProperties());
+        rewrittenPlan = planRewriter.apply(rewrittenPlanContext);
+        Map<ExprId, Slot> exprIdToNewRewrittenSlot = Maps.newLinkedHashMap();
+        for (Slot slot : rewrittenPlan.getOutput()) {
+            exprIdToNewRewrittenSlot.put(slot.getExprId(), slot);
+        }
+        List<ExprId> rewrittenPlanExprIds = rewrittenPlan.getOutput().stream()
+                .map(Slot::getExprId).collect(Collectors.toList());
+        // If project order doesn't change, return rewrittenPlan directly
+        if (originalRewrittenPlanExprIds.equals(rewrittenPlanExprIds)) {
+            return rewrittenPlan;
+        }
+        // If project order change, return rewrittenPlan with reordered projects
+        return new LogicalProject<>(originalRewrittenPlanExprIds.stream()
+                .map(exprId -> (NamedExpression) exprIdToNewRewrittenSlot.get(exprId)).collect(Collectors.toList()),
+                rewrittenPlan);
+    }
+
     private static final class TableQueryOperatorChecker extends DefaultPlanVisitor<Boolean, Void> {
         public static final TableQueryOperatorChecker INSTANCE = new TableQueryOperatorChecker();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java
index 139230be5d4..c801e683d65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java
@@ -17,8 +17,15 @@
 
 package org.apache.doris.nereids.rules.exploration.mv;
 
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.rules.exploration.mv.mapping.EquivalenceClassSetMapping;
+import org.apache.doris.nereids.rules.exploration.mv.mapping.Mapping.MappedSlot;
 import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
 import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
 import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
@@ -27,13 +34,17 @@ import org.apache.doris.nereids.trees.expressions.EqualTo;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.Utils;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -220,6 +231,47 @@ public class Predicates {
         return Utils.toSqlString("Predicates", "pulledUpPredicates", pulledUpPredicates);
     }
 
+    /** Construct filter by partition
+     * @param partitions this is the partition which filter should be constructed from
+     * @param queryToViewSlotMapping construct filter on slot, the slot belong the slotmapping
+     * */
+    public static Map<TableIf, Set<Expression>> constructFilterByPartitions(
+            Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> partitions,
+            SlotMapping queryToViewSlotMapping) throws AnalysisException {
+        Map<TableIf, Set<Expression>> constructedFilterMap = new HashMap<>();
+        for (Map.Entry<Pair<MTMVPartitionInfo, PartitionInfo>, Collection<Partition>> entry :
+                partitions.asMap().entrySet()) {
+            // Get the base table partition column mv related
+            String relatedCol = entry.getKey().key().getRelatedCol();
+            TableIf relatedTableInfo = entry.getKey().key().getRelatedTable();
+            // Find the query slot which mv partition col mapped to
+            Optional<MappedSlot> partitionSlotQueryUsed = queryToViewSlotMapping.getRelationSlotMap()
+                    .keySet()
+                    .stream()
+                    .filter(mappedSlot -> mappedSlot.getSlot().isColumnFromTable()
+                            && mappedSlot.getSlot().getName().equals(relatedCol)
+                            && mappedSlot.getBelongedRelation() != null
+                            && mappedSlot.getBelongedRelation().getTable().getId() == relatedTableInfo.getId())
+                    .findFirst();
+            if (!partitionSlotQueryUsed.isPresent()) {
+                return ImmutableMap.of();
+            }
+            // Constructed filter which should add on the query base table,
+            // after supported data roll up this method should keep logic consistency to partition mapping
+            Set<Expression> partitionExpressions = UpdateMvByPartitionCommand.constructPredicates(
+                    // get mv partition items
+                    entry.getValue().stream()
+                            .map(partition -> entry.getKey().value().getItem(partition.getId()))
+                            .collect(Collectors.toSet()),
+                    partitionSlotQueryUsed.get().getSlot());
+            // Put partition expressions on query base table
+            constructedFilterMap.computeIfPresent(relatedTableInfo,
+                    (key, existExpressions) -> Sets.union(existExpressions, partitionExpressions));
+            constructedFilterMap.computeIfAbsent(relatedTableInfo, key -> partitionExpressions);
+        }
+        return constructedFilterMap;
+    }
+
     /**
      * The split different representation for predicate expression, such as equal, range and residual predicate.
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
index 604e7853d48..4979f11c28e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
@@ -17,13 +17,21 @@
 
 package org.apache.doris.nereids.rules.exploration.mv;
 
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.jobs.executor.Rewriter;
 import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph;
 import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.JoinEdge;
 import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode;
 import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
+import org.apache.doris.nereids.trees.copier.DeepCopierContext;
+import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
 import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
@@ -38,11 +46,16 @@ import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
 import org.apache.doris.nereids.trees.plans.algebra.Filter;
 import org.apache.doris.nereids.trees.plans.algebra.Join;
 import org.apache.doris.nereids.trees.plans.algebra.Project;
+import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
 import org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer;
 import org.apache.doris.nereids.util.ExpressionUtils;
@@ -584,4 +597,61 @@ public class StructInfo {
             return true;
         }
     }
+
+    /**
+     * Add predicates on base table when materialized view scan contains invalid partitions
+     */
+    public static class InvalidPartitionRemover extends DefaultPlanRewriter<Pair<MTMV, Set<Long>>> {
+        // materialized view scan is always LogicalOlapScan, so just handle LogicalOlapScan
+        @Override
+        public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Pair<MTMV, Set<Long>> context) {
+            if (olapScan.getTable().getName().equals(context.key().getName())) {
+                List<Long> selectedPartitionIds = olapScan.getSelectedPartitionIds();
+                return olapScan.withSelectedPartitionIds(selectedPartitionIds.stream()
+                        .filter(partitionId -> !context.value().contains(partitionId))
+                        .collect(Collectors.toList()));
+            }
+            return olapScan;
+        }
+    }
+
+    /**Collect partitions which scan used according to given table */
+    public static class QueryScanPartitionsCollector extends DefaultPlanVisitor<Plan, Map<Long, Set<PartitionItem>>> {
+        @Override
+        public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
+                Map<Long, Set<PartitionItem>> context) {
+            TableIf table = catalogRelation.getTable();
+            if (!context.containsKey(table.getId())) {
+                return catalogRelation;
+            }
+            // Only support check olap partition currently
+            if (catalogRelation instanceof LogicalOlapScan) {
+                LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation;
+                PartitionInfo partitionInfo = logicalOlapScan.getTable().getPartitionInfo();
+                logicalOlapScan.getSelectedPartitionIds().stream()
+                        .map(partitionInfo::getItem)
+                        .forEach(partitionItem -> context.computeIfPresent(table.getId(), (key, oldValue) -> {
+                            oldValue.add(partitionItem);
+                            return oldValue;
+                        }));
+            }
+            return catalogRelation;
+        }
+    }
+
+    /**Add filter on table scan according to table filter map */
+    public static Plan addFilterOnTableScan(Plan queryPlan, Map<TableIf, Set<Expression>> filterOnOriginPlan,
+            CascadesContext parentCascadesContext) {
+        // Firstly, construct filter form invalid partition, this filter should be added on origin plan
+        Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), filterOnOriginPlan);
+        // Deep copy the plan to avoid the plan output is the same with the later union output, this may cause
+        // exec by mistake
+        queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy(
+                (LogicalPlan) queryPlanWithUnionFilter, new DeepCopierContext());
+        // rbo rewrite after adding filter on origin plan
+        return MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> {
+            Rewriter.getWholeTreeRewriter(context).execute();
+            return context.getRewritePlan();
+        }, queryPlanWithUnionFilter, queryPlan);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java
index 9dd80771785..152d4c5d92e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java
@@ -37,7 +37,7 @@ import java.util.List;
 public class EliminateSort extends DefaultPlanRewriter<Boolean> implements CustomRewriter {
     @Override
     public Plan rewriteRoot(Plan plan, JobContext jobContext) {
-        Boolean eliminateSort = false;
+        Boolean eliminateSort = true;
         return plan.accept(this, eliminateSort);
     }
 
@@ -76,7 +76,7 @@ public class EliminateSort extends DefaultPlanRewriter<Boolean> implements Custo
             // eliminate sort
             return visit(logicalSink, true);
         }
-        return skipEliminateSort(logicalSink, eliminateSort);
+        return skipEliminateSort(logicalSink, false);
     }
 
     private Plan skipEliminateSort(Plan plan, Boolean eliminateSort) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index 67864187bb2..d63aee6ea70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.Sink;
 import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
@@ -120,18 +121,27 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
      */
     @VisibleForTesting
     public static Set<Expression> constructPredicates(Set<PartitionItem> partitions, String colName) {
-        Set<Expression> predicates = new HashSet<>();
         UnboundSlot slot = new UnboundSlot(colName);
+        return constructPredicates(partitions, slot);
+    }
+
+    /**
+     * construct predicates for partition items, the min key is the min key of range items.
+     * For list partition or less than partition items, the min key is null.
+     */
+    @VisibleForTesting
+    public static Set<Expression> constructPredicates(Set<PartitionItem> partitions, Slot colSlot) {
+        Set<Expression> predicates = new HashSet<>();
         if (partitions.isEmpty()) {
             return Sets.newHashSet(BooleanLiteral.TRUE);
         }
         if (partitions.iterator().next() instanceof ListPartitionItem) {
             for (PartitionItem item : partitions) {
-                predicates.add(convertListPartitionToIn(item, slot));
+                predicates.add(convertListPartitionToIn(item, colSlot));
             }
         } else {
             for (PartitionItem item : partitions) {
-                predicates.add(convertRangePartitionToCompare(item, slot));
+                predicates.add(convertRangePartitionToCompare(item, colSlot));
             }
         }
         return predicates;
@@ -186,7 +196,10 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
         return predicate;
     }
 
-    static class PredicateAdder extends DefaultPlanRewriter<Map<TableIf, Set<Expression>>> {
+    /**
+     * Add predicates on base table when mv can partition update
+     */
+    public static class PredicateAdder extends DefaultPlanRewriter<Map<TableIf, Set<Expression>>> {
         @Override
         public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map<TableIf, Set<Expression>> predicates) {
             List<String> tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(),
@@ -198,5 +211,16 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
             }
             return unboundRelation;
         }
+
+        @Override
+        public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
+                Map<TableIf, Set<Expression>> predicates) {
+            TableIf table = catalogRelation.getTable();
+            if (predicates.containsKey(table)) {
+                return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
+                        catalogRelation);
+            }
+            return catalogRelation;
+        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java
index 2060718ec13..1252f3b4bbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java
@@ -45,12 +45,12 @@ import java.util.stream.Collectors;
  * Get from rewrite plan and can also get from plan struct info, if from plan struct info it depends on
  * the nodes from graph.
  */
-public class ExpressionLineageReplacer extends DefaultPlanVisitor<Void, ExpressionReplaceContext> {
+public class ExpressionLineageReplacer extends DefaultPlanVisitor<Expression, ExpressionReplaceContext> {
 
     public static final ExpressionLineageReplacer INSTANCE = new ExpressionLineageReplacer();
 
     @Override
-    public Void visit(Plan plan, ExpressionReplaceContext context) {
+    public Expression visit(Plan plan, ExpressionReplaceContext context) {
         List<? extends Expression> expressions = plan.getExpressions();
         Map<ExprId, Expression> targetExpressionMap = context.getExprIdExpressionMap();
         // Filter the namedExpression used by target and collect the namedExpression
@@ -62,7 +62,7 @@ public class ExpressionLineageReplacer extends DefaultPlanVisitor<Void, Expressi
     }
 
     @Override
-    public Void visitGroupPlan(GroupPlan groupPlan, ExpressionReplaceContext context) {
+    public Expression visitGroupPlan(GroupPlan groupPlan, ExpressionReplaceContext context) {
         Group group = groupPlan.getGroup();
         if (group == null) {
             return visit(groupPlan, context);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 5b0512683aa..0148d30508c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -521,6 +521,9 @@ public class SessionVariable implements Serializable, Writable {
     public static final String MATERIALIZED_VIEW_REWRITE_SUCCESS_CANDIDATE_NUM
             = "materialized_view_rewrite_success_candidate_num";
 
+    public static final String ENABLE_MATERIALIZED_VIEW_UNION_REWRITE
+            = "enable_materialized_view_union_rewrite";
+
     public static final String CREATE_TABLE_PARTITION_MAX_NUM
             = "create_table_partition_max_num";
 
@@ -1638,6 +1641,13 @@ public class SessionVariable implements Serializable, Writable {
                     "The max candidate num which participate in CBO when using asynchronous materialized views"})
     public int materializedViewRewriteSuccessCandidateNum = 3;
 
+    @VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_UNION_REWRITE, needForward = true,
+            description = {"当物化视图不足以提供查询的全部数据时,是否允许基表和物化视图 union 来响应查询",
+                    "When the materialized view is not enough to provide all the data for the query, "
+                            + "whether to allow the union of the base table and the materialized view to "
+                            + "respond to the query"})
+    public boolean enableMaterializedViewUnionRewrite = false;
+
     @VariableMgr.VarAttr(name = CREATE_TABLE_PARTITION_MAX_NUM, needForward = true,
             description = {"建表时创建分区的最大数量",
                     "The maximum number of partitions created during table creation"})
@@ -3674,6 +3684,10 @@ public class SessionVariable implements Serializable, Writable {
         return materializedViewRewriteSuccessCandidateNum;
     }
 
+    public boolean isEnableMaterializedViewUnionRewrite() {
+        return enableMaterializedViewUnionRewrite;
+    }
+
     public int getCreateTablePartitionMaxNum() {
         return createTablePartitionMaxNum;
     }
diff --git a/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out
new file mode 100644
index 00000000000..8559da6305b
--- /dev/null
+++ b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !query_all_direct_before --
+2023-10-17	2023-10-17	2	3	199.00
+2023-10-18	2023-10-18	2	3	109.20
+2023-10-19	2023-10-19	2	3	99.50
+
+-- !query_all_direct_after --
+2023-10-17	2023-10-17	2	3	199.00
+2023-10-18	2023-10-18	2	3	109.20
+2023-10-19	2023-10-19	2	3	99.50
+
+-- !query_partition_before --
+2023-10-18	2023-10-18	2	3	109.20
+2023-10-19	2023-10-19	2	3	99.50
+
+-- !query_partition_after --
+2023-10-18	2023-10-18	2	3	109.20
+2023-10-19	2023-10-19	2	3	99.50
+
+-- !query_all_before --
+2023-10-17	2023-10-17	2	3	199.00
+2023-10-18	2023-10-18	2	3	109.20
+2023-10-19	2023-10-19	2	3	99.50
+
+-- !query_all_after --
+2023-10-17	2023-10-17	2	3	199.00
+2023-10-18	2023-10-18	2	3	109.20
+2023-10-19	2023-10-19	2	3	99.50
+
diff --git a/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy b/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy
new file mode 100644
index 00000000000..9a2e893f7e4
--- /dev/null
+++ b/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*
+This suite test self connection case
+ */
+suite("partition_mv_rewrite_dimension_self_conn") {
+    String db = context.config.getDbNameByFile(context.file)
+    sql "use ${db}"
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+    sql "SET enable_materialized_view_rewrite=true"
+    sql "SET enable_nereids_timeout = false"
+
+    sql """
+    drop table if exists orders_self_conn
+    """
+
+    sql """CREATE TABLE `orders_self_conn` (
+      `o_orderkey` BIGINT NULL,
+      `o_custkey` INT NULL,
+      `o_orderstatus` VARCHAR(1) NULL,
+      `o_totalprice` DECIMAL(15, 2)  NULL,
+      `o_orderpriority` VARCHAR(15) NULL,
+      `o_clerk` VARCHAR(15) NULL,
+      `o_shippriority` INT NULL,
+      `o_comment` VARCHAR(79) NULL,
+      `o_orderdate` DATE not NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`o_orderkey`, `o_custkey`)
+    COMMENT 'OLAP'
+    auto partition by range (date_trunc(`o_orderdate`, 'day')) ()
+    DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );"""
+
+    sql """
+    drop table if exists lineitem_self_conn
+    """
+
+    sql """CREATE TABLE `lineitem_self_conn` (
+      `l_orderkey` BIGINT NULL,
+      `l_linenumber` INT NULL,
+      `l_partkey` INT NULL,
+      `l_suppkey` INT NULL,
+      `l_quantity` DECIMAL(15, 2) NULL,
+      `l_extendedprice` DECIMAL(15, 2) NULL,
+      `l_discount` DECIMAL(15, 2) NULL,
+      `l_tax` DECIMAL(15, 2) NULL,
+      `l_returnflag` VARCHAR(1) NULL,
+      `l_linestatus` VARCHAR(1) NULL,
+      `l_commitdate` DATE NULL,
+      `l_receiptdate` DATE NULL,
+      `l_shipinstruct` VARCHAR(25) NULL,
+      `l_shipmode` VARCHAR(10) NULL,
+      `l_comment` VARCHAR(44) NULL,
+      `l_shipdate` DATE not NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey )
+    COMMENT 'OLAP'
+    auto partition by range (date_trunc(`l_shipdate`, 'day')) ()
+    DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );"""
+
+    sql """
+    insert into orders_self_conn values 
+    (null, 1, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'),
+    (1, null, 'o', 109.2, 'c','d',2, 'mm', '2023-10-17'),
+    (3, 3, null, 99.5, 'a', 'b', 1, 'yy', '2023-10-19'),
+    (1, 2, 'o', null, 'a', 'b', 1, 'yy', '2023-10-20'),
+    (2, 3, 'k', 109.2, null,'d',2, 'mm', '2023-10-21'),
+    (3, 1, 'k', 99.5, 'a', null, 1, 'yy', '2023-10-22'),
+    (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'),
+    (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'),
+    (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'),
+    (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); 
+    """
+
+    sql """
+    insert into lineitem_self_conn values 
+    (null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'),
+    (1, null, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-17'),
+    (3, 3, null, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-19'),
+    (1, 2, 3, null, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'),
+    (2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'),
+    (3, 1, 1, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'),
+    (1, 3, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17');
+    """
+
+    sql """analyze table orders_self_conn with sync;"""
+    sql """analyze table lineitem_self_conn with sync;"""
+
+    def create_mv = { mv_name, mv_sql ->
+        sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};"""
+        sql """DROP TABLE IF EXISTS ${mv_name}"""
+        sql"""
+        CREATE MATERIALIZED VIEW ${mv_name} 
+        BUILD IMMEDIATE REFRESH AUTO ON MANUAL 
+        DISTRIBUTED BY RANDOM BUCKETS 2 
+        PROPERTIES ('replication_num' = '1') 
+        AS  
+        ${mv_sql}
+        """
+    }
+
+    def compare_res = { def stmt ->
+        sql "SET enable_materialized_view_rewrite=false"
+        def origin_res = sql stmt
+        logger.info("origin_res: " + origin_res)
+        sql "SET enable_materialized_view_rewrite=true"
+        def mv_origin_res = sql stmt
+        logger.info("mv_origin_res: " + mv_origin_res)
+        assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size()))
+        for (int row = 0; row < mv_origin_res.size(); row++) {
+            assertTrue(mv_origin_res[row].size() == origin_res[row].size())
+            for (int col = 0; col < mv_origin_res[row].size(); col++) {
+                assertTrue(mv_origin_res[row][col] == origin_res[row][col])
+            }
+        }
+    }
+
+
+    // join direction
+    def mv_name_1 = "mv_self_conn"
+
+    def join_direction_mv_1 = """
+        select t1.l_Shipdate, t2.l_partkey, t1.l_suppkey 
+        from lineitem_self_conn as t1 
+        left join lineitem_self_conn as t2 
+        on t1.l_orderkey = t2.l_orderkey
+        """
+
+    create_mv(mv_name_1, join_direction_mv_1)
+    def job_name_1 = getJobName(db, mv_name_1)
+    waitingMTMVTaskFinished(job_name_1)
+
+    def join_direction_sql_1 = """
+        select t1.L_SHIPDATE 
+        from lineitem_self_conn as t1 
+        left join lineitem_self_conn as t2 
+        on t1.l_orderkey = t2.l_orderkey
+        """
+    explain {
+        sql("${join_direction_sql_1}")
+        contains "${mv_name_1}(${mv_name_1})"
+    }
+    compare_res(join_direction_sql_1 + " order by 1")
+    sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};"""
+
+    // join filter position
+    def join_filter_stmt_1 = """select t1.L_SHIPDATE, t2.l_partkey, t1.l_suppkey  
+        from lineitem_self_conn as t1 
+        left join lineitem_self_conn as t2 
+        on t1.l_orderkey = t2.l_orderkey"""
+    def join_filter_stmt_2 = """select t1.l_shipdate, t2.L_partkey, t1.l_suppkey     
+        from (select * from lineitem_self_conn where l_shipdate = '2023-10-17' ) t1 
+        left join lineitem_self_conn as t2 
+        on t1.l_orderkey = t2.l_orderkey"""
+    def join_filter_stmt_3 = """select t1.l_shipdate, t2.l_Partkey, t1.l_suppkey   
+        from lineitem_self_conn as t1 
+        left join (select * from lineitem_self_conn where l_shipdate = '2023-10-17' ) t2 
+        on t1.l_orderkey = t2.l_orderkey"""
+    def join_filter_stmt_4 = """select t1.l_shipdate, t2.l_parTkey, t1.l_suppkey  
+        from lineitem_self_conn as t1
+        left join lineitem_self_conn as t2 
+        on t1.l_orderkey = t2.l_orderkey 
+        where t1.l_shipdate = '2023-10-17'"""
+    def join_filter_stmt_5 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey  
+        from lineitem_self_conn as t1 
+        left join lineitem_self_conn as t2 
+        on t1.l_orderkey = t2.l_orderkey 
+        where t1.l_suppkey=1"""
+
+    def mv_list = [
+            join_filter_stmt_1, join_filter_stmt_2, join_filter_stmt_3, join_filter_stmt_4, join_filter_stmt_5]
+    def join_self_conn_order = " order by 1, 2, 3"
+    for (int i =0; i < mv_list.size(); i++) {
+        logger.info("i:" + i)
+        def join_self_conn_mv = """join_self_conn_mv_${i}"""
+        create_mv(join_self_conn_mv, mv_list[i])
+        def job_name = getJobName(db, join_self_conn_mv)
+        waitingMTMVTaskFinished(job_name)
+        if (i == 0) {
+            for (int j = 0; j < mv_list.size(); j++) {
+                logger.info("j:" + j)
+                if (j == 2) {
+                    continue
+                }
+                explain {
+                    sql("${mv_list[j]}")
+                    contains "${join_self_conn_mv}(${join_self_conn_mv})"
+                }
+                compare_res(mv_list[j] + join_self_conn_order)
+            }
+        } else if (i == 1) {
+            for (int j = 0; j < mv_list.size(); j++) {
+                logger.info("j:" + j)
+                if (j == 1 || j == 3) {
+                    explain {
+                        sql("${mv_list[j]}")
+                        contains "${join_self_conn_mv}(${join_self_conn_mv})"
+                    }
+                    compare_res(mv_list[j] + join_self_conn_order)
+                } else {
+                    explain {
+                        sql("${mv_list[j]}")
+                        notContains "${join_self_conn_mv}(${join_self_conn_mv})"
+                    }
+                }
+            }
+        } else if (i == 2) {
+            for (int j = 0; j < mv_list.size(); j++) {
+                logger.info("j:" + j)
+                if (j == 2) {
+                    explain {
+                        sql("${mv_list[j]}")
+                        contains "${join_self_conn_mv}(${join_self_conn_mv})"
+                    }
+                    compare_res(mv_list[j] + join_self_conn_order)
+                } else {
+                    explain {
+                        sql("${mv_list[j]}")
+                        notContains "${join_self_conn_mv}(${join_self_conn_mv})"
+                    }
+                }
+
+            }
+        } else if (i == 3) {
+            for (int j = 0; j < mv_list.size(); j++) {
+                logger.info("j:" + j)
+                if (j == 1 || j == 3) {
+                    explain {
+                        sql("${mv_list[j]}")
+                        contains "${join_self_conn_mv}(${join_self_conn_mv})"
+                    }
+                    compare_res(mv_list[j] + join_self_conn_order)
+                } else {
+                    explain {
+                        sql("${mv_list[j]}")
+                        notContains "${join_self_conn_mv}(${join_self_conn_mv})"
+                    }
+                }
+            }
+        } else if (i == 4) {
+            for (int j = 0; j < mv_list.size(); j++) {
+                logger.info("j:" + j)
+                if (j == 4) {
+                    explain {
+                        sql("${mv_list[j]}")
+                        contains "${join_self_conn_mv}(${join_self_conn_mv})"
+                    }
+                    compare_res(mv_list[j] + join_self_conn_order)
+                } else {
+                    explain {
+                        sql("${mv_list[j]}")
+                        notContains "${join_self_conn_mv}(${join_self_conn_mv})"
+                    }
+                }
+            }
+        }
+        sql """DROP MATERIALIZED VIEW IF EXISTS ${join_self_conn_mv};"""
+    }
+
+    // join type
+    def join_type_stmt_1 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey 
+        from lineitem_self_conn as t1 
+        left join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY"""
+    def join_type_stmt_2 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey 
+        from lineitem_self_conn as t1 
+        inner join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY"""
+    def join_type_stmt_3 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey 
+        from lineitem_self_conn as t1 
+        right join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY"""
+    def join_type_stmt_5 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey 
+        from lineitem_self_conn as t1 
+        full join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY"""
+    def join_type_stmt_6 = """select t1.l_shipdate, t1.l_partkey, t1.l_suppkey 
+        from lineitem_self_conn as t1 
+        left semi join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY"""
+    def join_type_stmt_7 = """select t2.l_shipdate, t2.l_partkey, t2.l_suppkey 
+        from lineitem_self_conn as t1 
+        right semi join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY"""
+    def join_type_stmt_8 = """select t1.l_shipdate, t1.l_partkey, t1.l_suppkey 
+        from lineitem_self_conn as t1 
+        left anti join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY"""
+    def join_type_stmt_9 = """select t2.l_shipdate, t2.l_partkey, t2.l_suppkey 
+        from lineitem_self_conn as t1 
+        right anti join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY"""
+    def join_type_stmt_list = [join_type_stmt_1, join_type_stmt_2, join_type_stmt_3,
+                               join_type_stmt_5, join_type_stmt_6, join_type_stmt_7, join_type_stmt_8, join_type_stmt_9]
+    for (int i = 0; i < join_type_stmt_list.size(); i++) {
+        logger.info("i:" + i)
+        String join_type_self_conn_mv = """join_type_self_conn_mv_${i}"""
+        create_mv(join_type_self_conn_mv, join_type_stmt_list[i])
+        def job_name = getJobName(db, join_type_self_conn_mv)
+        waitingMTMVTaskFinished(job_name)
+        if (i in [4, 5]) {
+            for (int j = 0; j < join_type_stmt_list.size(); j++) {
+                logger.info("j: " + j)
+                if (j in [4, 5]) {
+                    explain {
+                        sql("${join_type_stmt_list[j]}")
+                        contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})"
+                    }
+                    compare_res(join_type_stmt_list[j] + " order by 1,2,3")
+                } else {
+                    explain {
+                        sql("${join_type_stmt_list[j]}")
+                        notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})"
+                    }
+                }
+            }
+        } else if (i in [6, 7]) {
+            for (int j = 0; j < join_type_stmt_list.size(); j++) {
+                logger.info("j: " + j)
+                if (j in [6, 7]) {
+                    explain {
+                        sql("${join_type_stmt_list[j]}")
+                        contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})"
+                    }
+                    compare_res(join_type_stmt_list[j] + " order by 1,2,3")
+                } else {
+                    explain {
+                        sql("${join_type_stmt_list[j]}")
+                        notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})"
+                    }
+                }
+            }
+        } else {
+            for (int j = 0; j < join_type_stmt_list.size(); j++) {
+                logger.info("j:" + j)
+                if (i == j) {
+                    explain {
+                        sql("${join_type_stmt_list[j]}")
+                        contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})"
+                    }
+                    compare_res(join_type_stmt_list[j] + " order by 1,2,3")
+                } else {
+                    explain {
+                        sql("${join_type_stmt_list[j]}")
+                        notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})"
+                    }
+                }
+            }
+        }
+        sql """DROP MATERIALIZED VIEW IF EXISTS ${join_type_self_conn_mv};"""
+    }
+
+    // agg
+    // agg + without group by + with agg function
+    agg_mv_stmt = """
+        select t2.o_orderkey, 
+        sum(t1.O_TOTALPRICE) as sum_total,
+        max(t1.o_totalprice) as max_total, 
+        min(t1.o_totalprice) as min_total, 
+        count(*) as count_all, 
+        bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, 
+        bitmap_union(to_bitmap(case when t1.o_shippriority > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end)) as cnt_2 
+        from orders_self_conn as t1 
+        left join orders_self_conn as t2
+        on t1.o_orderkey = t2.o_orderkey
+        group by t2.o_orderkey
+        """
+
+    create_mv(mv_name_1, agg_mv_stmt)
+    job_name_1 = getJobName(db, mv_name_1)
+    waitingMTMVTaskFinished(job_name_1)
+
+    def agg_sql_1 = """select t2.o_orderkey,
+        count(distinct case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end) as cnt_1, 
+        count(distinct case when t1.O_SHIPPRIORITY > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end) as cnt_2, 
+        sum(t1.O_totalprice), 
+        max(t1.o_totalprice), 
+        min(t1.o_totalprice), 
+        count(*) 
+        from orders_self_conn as t1
+        left join orders_self_conn as t2
+        on t1.o_orderkey = t2.o_orderkey
+        group by t2.o_orderkey
+        """
+    explain {
+        sql("${agg_sql_1}")
+        contains "${mv_name_1}(${mv_name_1})"
+    }
+    compare_res(agg_sql_1 + " order by 1,2,3,4,5,6,7")
+
+    agg_sql_1 = """select t2.o_orderkey,
+        count(distinct case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end) as cnt_1, 
+        count(distinct case when t1.O_SHIPPRIORITY > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end) as cnt_2, 
+        sum(t1.O_totalprice), 
+        max(t1.o_totalprice), 
+        min(t1.o_totalprice), 
+        count(*) 
+        from orders_self_conn as t1
+        left join orders_self_conn as t2
+        on t1.o_orderkey = t2.o_orderkey
+        group by t2.o_orderkey
+        """
+    explain {
+        sql("${agg_sql_1}")
+        contains "${mv_name_1}(${mv_name_1})"
+    }
+    compare_res(agg_sql_1 + " order by 1,2,3,4,5,6")
+    sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};"""
+
+    // agg + with group by + without agg function
+
+    def agg_mv_stmt_2 = """
+        select t1.o_orderdatE, t2.O_SHIPPRIORITY, t1.o_comment  
+        from orders_self_conn as t1 
+        inner join  orders_self_conn as t2 
+        on t1.o_orderkey = t2.o_orderkey 
+        group by 
+        t1.o_orderdate, 
+        t2.o_shippriority, 
+        t1.o_comment  
+        """
+    create_mv(mv_name_1, agg_mv_stmt_2)
+    def agg_job_name_2 = getJobName(db, mv_name_1)
+    waitingMTMVTaskFinished(agg_job_name_2)
+    sql """analyze table ${mv_name_1} with sync;"""
+
+    def agg_sql_2 = """
+        select t2.O_SHIPPRIORITY, t1.o_comment  
+        from orders_self_conn as t1 
+        inner join  orders_self_conn as t2 
+        on t1.o_orderkey = t2.o_orderkey 
+        group by 
+        t2.o_shippriority, 
+        t1.o_comment  
+        """
+    explain {
+        sql("${agg_sql_2}")
+        contains "${mv_name_1}(${mv_name_1})"
+    }
+    compare_res(agg_sql_2 + " order by 1,2")
+
+    // agg + with group by + with agg function
+    def agg_mv_stmt_3 = """
+        select t1.o_orderdatE, t2.o_shippriority, t1.o_comment, 
+        sum(t1.o_totalprice) as sum_total, 
+        max(t2.o_totalpricE) as max_total, 
+        min(t1.o_totalprice) as min_total, 
+        count(*) as count_all, 
+        bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, 
+        bitmap_union(to_bitmap(case when t2.o_shippriority > 2 and t2.o_orderkey IN (2) then t2.o_custkey else null end)) as cnt_2 
+        from orders_self_conn  as t1 
+        inner join orders_self_conn as t2
+        on t1.o_orderkey = t2.o_orderkey
+        group by 
+        t1.o_orderdatE, 
+        t2.o_shippriority, 
+        t1.o_comment 
+        """
+    create_mv(mv_name_1, agg_mv_stmt_3)
+    def agg_job_name_3 = getJobName(db, mv_name_1)
+    waitingMTMVTaskFinished(agg_job_name_3)
+    sql """analyze table ${mv_name_1} with sync;"""
+
+    def agg_sql_3 = """
+        select t2.o_shippriority, t1.o_comment, 
+        sum(t1.o_totalprice) as sum_total, 
+        max(t2.o_totalpricE) as max_total, 
+        min(t1.o_totalprice) as min_total, 
+        count(*) as count_all, 
+        bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, 
+        bitmap_union(to_bitmap(case when t2.o_shippriority > 2 and t2.o_orderkey IN (2) then t2.o_custkey else null end)) as cnt_2 
+        from orders_self_conn  as t1 
+        inner join orders_self_conn as t2
+        on t1.o_orderkey = t2.o_orderkey
+        group by 
+        t2.o_shippriority, 
+        t1.o_comment
+        """
+    explain {
+        sql("${agg_sql_3}")
+        contains "${mv_name_1}(${mv_name_1})"
+    }
+    compare_res(agg_sql_3 + " order by 1,2,3,4,5,6")
+
+
+    // view partital rewriting
+    def view_partition_mv_stmt_1 = """
+        select t1.l_shipdatE, t2.l_partkey, t1.l_orderkey 
+        from lineitem_self_conn as t1 
+        inner join lineitem_self_conn as t2 
+        on t1.L_ORDERKEY = t2.L_ORDERKEY
+        group by t1.l_shipdate, t2.l_partkey, t1.l_orderkeY"""
+    create_mv(mv_name_1, view_partition_mv_stmt_1)
+    def view_partition_job_name_1 = getJobName(db, mv_name_1)
+    waitingMTMVTaskFinished(view_partition_job_name_1)
+
+    def view_partition_sql_1 = """select t.l_shipdate, lineitem_self_conn.l_orderkey, t.l_partkey 
+        from (
+            select t1.l_shipdatE as l_shipdatE, t2.l_partkey as l_partkey, t1.l_orderkey as l_orderkey 
+            from lineitem_self_conn as t1 
+            inner join lineitem_self_conn as t2 
+            on t1.L_ORDERKEY = t2.L_ORDERKEY
+            group by t1.l_shipdate, t2.l_partkey, t1.l_orderkeY
+        ) t 
+        inner join lineitem_self_conn    
+        on t.l_partkey = lineitem_self_conn.l_partkey 
+        group by t.l_shipdate, lineitem_self_conn.l_orderkey, t.l_partkey
+        """
+    explain {
+        sql("${view_partition_sql_1}")
+        contains "${mv_name_1}(${mv_name_1})"
+    }
+    compare_res(view_partition_sql_1 + " order by 1,2,3")
+    sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};"""
+
+
+    // predicate compensate
+    def predicate_mv_stmt_1 = """
+        select t1.l_shipdatE, t2.l_shipdate, t1.l_partkey 
+        from lineitem_self_conn as t1 
+        inner join lineitem_self_conn as t2  
+        on t1.l_orderkey = t2.l_orderkey
+        where t1.l_shipdate >= "2023-10-17"
+        """
+    create_mv(mv_name_1, predicate_mv_stmt_1)
+    def predicate_job_name_1 = getJobName(db, mv_name_1)
+    waitingMTMVTaskFinished(predicate_job_name_1)
+
+    def predicate_sql_1 = """
+        select t1.l_shipdatE, t2.l_shipdate, t1.l_partkey 
+        from lineitem_self_conn as t1 
+        inner join lineitem_self_conn as t2  
+        on t1.l_orderkey = t2.l_orderkey
+        where t1.l_shipdate >= "2023-10-17" and t1.l_partkey = 1
+        """
+    explain {
+        sql("${predicate_sql_1}")
+        contains "${mv_name_1}(${mv_name_1})"
+    }
+    compare_res(predicate_sql_1 + " order by 1,2,3")
+    sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};"""
+
+}
diff --git a/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy b/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy
new file mode 100644
index 00000000000..4a0f513f0fa
--- /dev/null
+++ b/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy
@@ -0,0 +1,859 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("nested_mtmv") {
+    String db = context.config.getDbNameByFile(context.file)
+    sql "use ${db}"
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+    sql "SET enable_materialized_view_rewrite=true"
+    sql "SET enable_nereids_timeout = false"
+
+    sql """
+    drop table if exists orders_1
+    """
+
+    sql """CREATE TABLE `orders_1` (
+      `o_orderkey` BIGINT NULL,
+      `o_custkey` INT NULL,
+      `o_orderstatus` VARCHAR(1) NULL,
+      `o_totalprice` DECIMAL(15, 2)  NULL,
+      `o_orderpriority` VARCHAR(15) NULL,
+      `o_clerk` VARCHAR(15) NULL,
+      `o_shippriority` INT NULL,
+      `o_comment` VARCHAR(79) NULL,
+      `o_orderdate` DATE not NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`o_orderkey`, `o_custkey`)
+    COMMENT 'OLAP'
+    auto partition by range (date_trunc(`o_orderdate`, 'day')) ()
+    DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );"""
+
+    sql """
+    drop table if exists lineitem_1
+    """
+
+    sql """CREATE TABLE `lineitem_1` (
+      `l_orderkey` BIGINT NULL,
+      `l_linenumber` INT NULL,
+      `l_partkey` INT NULL,
+      `l_suppkey` INT NULL,
+      `l_quantity` DECIMAL(15, 2) NULL,
+      `l_extendedprice` DECIMAL(15, 2) NULL,
+      `l_discount` DECIMAL(15, 2) NULL,
+      `l_tax` DECIMAL(15, 2) NULL,
+      `l_returnflag` VARCHAR(1) NULL,
+      `l_linestatus` VARCHAR(1) NULL,
+      `l_commitdate` DATE NULL,
+      `l_receiptdate` DATE NULL,
+      `l_shipinstruct` VARCHAR(25) NULL,
+      `l_shipmode` VARCHAR(10) NULL,
+      `l_comment` VARCHAR(44) NULL,
+      `l_shipdate` DATE not NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey )
+    COMMENT 'OLAP'
+    auto partition by range (date_trunc(`l_shipdate`, 'day')) ()
+    DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );"""
+
+    sql """
+    drop table if exists partsupp_1
+    """
+
+    sql """CREATE TABLE `partsupp_1` (
+      `ps_partkey` INT NULL,
+      `ps_suppkey` INT NULL,
+      `ps_availqty` INT NULL,
+      `ps_supplycost` DECIMAL(15, 2) NULL,
+      `ps_comment` VARCHAR(199) NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`ps_partkey`)
+    COMMENT 'OLAP'
+    DISTRIBUTED BY HASH(`ps_partkey`) BUCKETS 24
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );"""
+
+    sql """
+    insert into orders_1 values 
+    (null, 1, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'),
+    (1, null, 'o', 109.2, 'c','d',2, 'mm', '2023-10-17'),
+    (3, 3, null, 99.5, 'a', 'b', 1, 'yy', '2023-10-19'),
+    (1, 2, 'o', null, 'a', 'b', 1, 'yy', '2023-10-20'),
+    (2, 3, 'k', 109.2, null,'d',2, 'mm', '2023-10-21'),
+    (3, 1, 'k', 99.5, 'a', null, 1, 'yy', '2023-10-22'),
+    (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'),
+    (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'),
+    (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'),
+    (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); 
+    """
+
+    sql """
+    insert into lineitem_1 values 
+    (null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'),
+    (1, 1, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-17'),
+    (3, 3, 3, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-19'),
+    (1, 2, 3, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'),
+    (2, 1, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'),
+    (3, 1, 3, 1, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'),
+    (1, 2, 1, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'),
+    (2, 2, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'),
+    (3, 3, 3, 3, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'),
+    (1, 1, 1, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17');
+    """
+
+    sql"""
+    insert into partsupp_1 values 
+    (1, 1, 1, 99.5, 'yy'),
+    (2, 2, 2, 109.2, 'mm'),
+    (3, 3, 1, 99.5, 'yy'),
+    (3, null, 1, 99.5, 'yy'); 
+    """
+
+    sql """analyze table orders_1 with sync;"""
+    sql """analyze table lineitem_1 with sync;"""
+    sql """analyze table partsupp_1 with sync;"""
+
+    def create_mv = { mv_name, mv_sql ->
+        sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};"""
+        sql """DROP TABLE IF EXISTS ${mv_name}"""
+        sql"""
+        CREATE MATERIALIZED VIEW ${mv_name} 
+        BUILD IMMEDIATE REFRESH AUTO ON MANUAL 
+        DISTRIBUTED BY RANDOM BUCKETS 2 
+        PROPERTIES ('replication_num' = '1') 
+        AS  
+        ${mv_sql}
+        """
+    }
+
+    def compare_res = { def stmt ->
+        sql "SET enable_materialized_view_rewrite=false"
+        def origin_res = sql stmt
+        logger.info("origin_res: " + origin_res)
+        sql "SET enable_materialized_view_rewrite=true"
+        def mv_origin_res = sql stmt
+        logger.info("mv_origin_res: " + mv_origin_res)
+        assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size()))
+        for (int row = 0; row < mv_origin_res.size(); row++) {
+            assertTrue(mv_origin_res[row].size() == origin_res[row].size())
+            for (int col = 0; col < mv_origin_res[row].size(); col++) {
+                assertTrue(mv_origin_res[row][col] == origin_res[row][col])
+            }
+        }
+    }
+
+    // sr
+    def mv_stmt_1 = """SELECT l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey
+        FROM lineitem_1 INNER JOIN orders_1
+        ON l_orderkey = o_orderkey"""
+    def mv_name_1 = "join_mv1"
+    def mv_stmt_2 = """SELECT 
+          l_orderkey, 
+          l_linenumber, 
+          o_orderkey, 
+          sum(l_partkey) AS total_revenue, 
+          max(o_custkey) AS max_discount 
+        FROM ${mv_name_1}
+        GROUP BY l_orderkey, l_linenumber, o_orderkey;"""
+    def mv_name_2 = "agg_mv2"
+    def mv_stmt_3 = """SELECT 
+          l_orderkey, 
+          sum(total_revenue) AS total_revenue, 
+          max(max_discount) AS max_discount 
+        FROM ${mv_name_2}
+        GROUP BY l_orderkey;"""
+    def mv_name_3 = "join_agg_mv3"
+    create_mv(mv_name_1, mv_stmt_1)
+    def job_name_1 = getJobName(db, mv_name_1)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_name_2, mv_stmt_2)
+    job_name_1 = getJobName(db, mv_name_2)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_name_3, mv_stmt_3)
+    job_name_1 = getJobName(db, mv_name_3)
+    waitingMTMVTaskFinished(job_name_1)
+
+    def query_stmt_1 = """SELECT 
+          l_orderkey, 
+          sum(l_partkey) AS total_revenue, 
+          max(o_custkey) AS max_discount 
+        FROM lineitem_1 INNER JOIN orders_1
+        ON l_orderkey = o_orderkey
+        GROUP BY l_orderkey"""
+    explain {
+        sql("${query_stmt_1}")
+        contains "${mv_name_3}(${mv_name_3})"
+    }
+    compare_res(query_stmt_1 + " order by 1,2,3")
+
+    // user
+    def mv_stmt_4 = """
+        select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 
+        from lineitem_1 
+        inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey
+        inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey
+        where lineitem_1.l_shipdate >= "2023-10-17" 
+        group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey
+        """
+    def mv_level1_name = "mv_level1_name"
+    def mv_stmt_5 = """
+        select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1
+        from ${mv_level1_name}
+        """
+    def mv_level2_name = "mv_level2_name"
+    def mv_stmt_6 = """
+        select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1
+        from ${mv_level1_name} as t1
+        left join ${mv_level1_name} as t2 
+        on t1.l_orderkey = t2.l_orderkey
+        """
+    def mv_level3_name = "mv_level3_name"
+    def mv_stmt_7 = """
+        select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1
+        from ${mv_level2_name} as t1
+        left join ${mv_level2_name} as t2 
+        on t1.l_orderkey = t2.l_orderkey
+        """
+    def mv_level4_name = "mv_level4_name"
+
+    create_mv(mv_level1_name, mv_stmt_4)
+    job_name_1 = getJobName(db, mv_level1_name)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_level2_name, mv_stmt_5)
+    job_name_1 = getJobName(db, mv_level2_name)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_level3_name, mv_stmt_6)
+    job_name_1 = getJobName(db, mv_level3_name)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_level4_name, mv_stmt_7)
+    job_name_1 = getJobName(db, mv_level4_name)
+    waitingMTMVTaskFinished(job_name_1)
+
+    def query_stmt_2 = """
+        select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1
+        from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1 
+            from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 
+                from lineitem_1 
+                inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey
+                inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey
+                where lineitem_1.l_shipdate >= "2023-10-17" 
+                group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey  
+                ) as t1
+            ) as t1 
+        left join (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1 
+            from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 
+                from lineitem_1 
+                inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey
+                inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey
+                where lineitem_1.l_shipdate >= "2023-10-17" 
+                group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey  
+                ) as t1 
+            ) as t2 on t1.l_orderkey = t2.l_orderkey
+        """
+    explain {
+        sql("${query_stmt_2}")
+        contains "${mv_level3_name}(${mv_level3_name})"
+    }
+    compare_res(query_stmt_2 + " order by 1,2,3,4,5,6,7")
+
+    // five level
+    def mv_1 = "mv1"
+    def mv_2 = "mv2"
+    def mv_3 = "mv3"
+    def mv_4 = "mv4"
+    def mv_5 = "mv5"
+
+    def join_mv_1 = """
+        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+        sum(o_totalprice) as sum_total, 
+        max(o_totalprice) as max_total, 
+        min(o_totalprice) as min_total, 
+        count(*) as count_all, 
+        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+        from lineitem_1
+        inner join orders_1
+        on lineitem_1.l_orderkey = orders_1.o_orderkey
+        where lineitem_1.l_shipdate >= "2023-10-17"
+        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+        """
+    def join_mv_2 = """
+        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey,
+        t.agg1 as agg1, 
+        t.sum_total as agg3,
+        t.max_total as agg4,
+        t.min_total as agg5,
+        t.count_all as agg6,
+        cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+        from ${mv_1} as t
+        inner join partsupp_1
+        on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+        where partsupp_1.ps_suppkey > 1
+        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+        """
+    def join_mv_3 = """
+        select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+        from ${mv_2} as t1
+        left join ${mv_2} as t2
+        on t1.l_orderkey = t2.l_orderkey
+        where t1.l_orderkey > 1
+        group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        """
+    def join_mv_4 = """
+        select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+        from ${mv_3} as t1
+        left join ${mv_3} as t2
+        on t1.l_orderkey = t2.l_orderkey
+        group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        """
+    def join_mv_5 = """
+        select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+        from ${mv_4} as t1
+        left join ${mv_4} as t2
+        on t1.l_orderkey = t2.l_orderkey
+        group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        """
+
+    create_mv(mv_1, join_mv_1)
+    job_name_1 = getJobName(db, mv_1)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_2, join_mv_2)
+    job_name_1 = getJobName(db, mv_2)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_3, join_mv_3)
+    job_name_1 = getJobName(db, mv_3)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_4, join_mv_4)
+    job_name_1 = getJobName(db, mv_4)
+    waitingMTMVTaskFinished(job_name_1)
+
+    create_mv(mv_5, join_mv_5)
+    job_name_1 = getJobName(db, mv_5)
+    waitingMTMVTaskFinished(job_name_1)
+
+
+    def sql_2 = """
+        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+        t.agg1 as agg1, 
+        t.sum_total as agg3,
+        t.max_total as agg4,
+        t.min_total as agg5,
+        t.count_all as agg6,
+        cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+        from (
+            select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+            sum(o_totalprice) as sum_total, 
+            max(o_totalprice) as max_total, 
+            min(o_totalprice) as min_total, 
+            count(*) as count_all, 
+            bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+            bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+            from lineitem_1
+            inner join orders_1
+            on lineitem_1.l_orderkey = orders_1.o_orderkey
+            where lineitem_1.l_shipdate >= "2023-10-17"
+            group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+        ) as t
+        inner join partsupp_1
+        on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+        where partsupp_1.ps_suppkey > 1
+        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+        """
+    def sql_3 = """
+        select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+        from (
+            select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+            t.agg1 as agg1, 
+            t.sum_total as agg3,
+            t.max_total as agg4,
+            t.min_total as agg5,
+            t.count_all as agg6,
+            cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+            from (
+                select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                sum(o_totalprice) as sum_total, 
+                max(o_totalprice) as max_total, 
+                min(o_totalprice) as min_total, 
+                count(*) as count_all, 
+                bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                from lineitem_1
+                inner join orders_1
+                on lineitem_1.l_orderkey = orders_1.o_orderkey
+                where lineitem_1.l_shipdate >= "2023-10-17"
+                group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+            ) as t
+            inner join partsupp_1
+            on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+            where partsupp_1.ps_suppkey > 1
+            group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+        ) as t1
+        left join (
+            select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+            t.agg1 as agg1, 
+            t.sum_total as agg3,
+            t.max_total as agg4,
+            t.min_total as agg5,
+            t.count_all as agg6,
+            cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+            from (
+                select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                sum(o_totalprice) as sum_total, 
+                max(o_totalprice) as max_total, 
+                min(o_totalprice) as min_total, 
+                count(*) as count_all, 
+                bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                from lineitem_1
+                inner join orders_1
+                on lineitem_1.l_orderkey = orders_1.o_orderkey
+                where lineitem_1.l_shipdate >= "2023-10-17"
+                group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+            ) as t
+            inner join partsupp_1
+            on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+            where partsupp_1.ps_suppkey > 1
+            group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+        ) as t2
+        on t1.l_orderkey = t2.l_orderkey
+        where t1.l_orderkey > 1
+        group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        """
+    def sql_4 = """
+        select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+        from (
+            select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+            from (
+                select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                t.agg1 as agg1, 
+                t.sum_total as agg3,
+                t.max_total as agg4,
+                t.min_total as agg5,
+                t.count_all as agg6,
+                cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                from (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                    sum(o_totalprice) as sum_total, 
+                    max(o_totalprice) as max_total, 
+                    min(o_totalprice) as min_total, 
+                    count(*) as count_all, 
+                    bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                    bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                    from lineitem_1
+                    inner join orders_1
+                    on lineitem_1.l_orderkey = orders_1.o_orderkey
+                    where lineitem_1.l_shipdate >= "2023-10-17"
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                ) as t
+                inner join partsupp_1
+                on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                where partsupp_1.ps_suppkey > 1
+                group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+            ) as t1
+            left join (
+                select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                t.agg1 as agg1, 
+                t.sum_total as agg3,
+                t.max_total as agg4,
+                t.min_total as agg5,
+                t.count_all as agg6,
+                cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                from (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                    sum(o_totalprice) as sum_total, 
+                    max(o_totalprice) as max_total, 
+                    min(o_totalprice) as min_total, 
+                    count(*) as count_all, 
+                    bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                    bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                    from lineitem_1
+                    inner join orders_1
+                    on lineitem_1.l_orderkey = orders_1.o_orderkey
+                    where lineitem_1.l_shipdate >= "2023-10-17"
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                ) as t
+                inner join partsupp_1
+                on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                where partsupp_1.ps_suppkey > 1
+                group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+            ) as t2
+            on t1.l_orderkey = t2.l_orderkey
+            where t1.l_orderkey > 1
+            group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        ) as t1
+        left join (
+            select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+            from (
+                select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                t.agg1 as agg1, 
+                t.sum_total as agg3,
+                t.max_total as agg4,
+                t.min_total as agg5,
+                t.count_all as agg6,
+                cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                from (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                    sum(o_totalprice) as sum_total, 
+                    max(o_totalprice) as max_total, 
+                    min(o_totalprice) as min_total, 
+                    count(*) as count_all, 
+                    bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                    bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                    from lineitem_1
+                    inner join orders_1
+                    on lineitem_1.l_orderkey = orders_1.o_orderkey
+                    where lineitem_1.l_shipdate >= "2023-10-17"
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                ) as t
+                inner join partsupp_1
+                on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                where partsupp_1.ps_suppkey > 1
+                group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+            ) as t1
+            left join (
+                select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                t.agg1 as agg1, 
+                t.sum_total as agg3,
+                t.max_total as agg4,
+                t.min_total as agg5,
+                t.count_all as agg6,
+                cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                from (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                    sum(o_totalprice) as sum_total, 
+                    max(o_totalprice) as max_total, 
+                    min(o_totalprice) as min_total, 
+                    count(*) as count_all, 
+                    bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                    bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                    from lineitem_1
+                    inner join orders_1
+                    on lineitem_1.l_orderkey = orders_1.o_orderkey
+                    where lineitem_1.l_shipdate >= "2023-10-17"
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                ) as t
+                inner join partsupp_1
+                on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                where partsupp_1.ps_suppkey > 1
+                group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+            ) as t2
+            on t1.l_orderkey = t2.l_orderkey
+            where t1.l_orderkey > 1
+            group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        ) as t2
+        on t1.l_orderkey = t2.l_orderkey
+        group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        """
+    def sql_5 = """
+        select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+        from (
+            select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+            from (
+                select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+                from (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                    t.agg1 as agg1, 
+                    t.sum_total as agg3,
+                    t.max_total as agg4,
+                    t.min_total as agg5,
+                    t.count_all as agg6,
+                    cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                    from (
+                        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                        sum(o_totalprice) as sum_total, 
+                        max(o_totalprice) as max_total, 
+                        min(o_totalprice) as min_total, 
+                        count(*) as count_all, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                        from lineitem_1
+                        inner join orders_1
+                        on lineitem_1.l_orderkey = orders_1.o_orderkey
+                        where lineitem_1.l_shipdate >= "2023-10-17"
+                        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                    ) as t
+                    inner join partsupp_1
+                    on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                    where partsupp_1.ps_suppkey > 1
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+                ) as t1
+                left join (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                    t.agg1 as agg1, 
+                    t.sum_total as agg3,
+                    t.max_total as agg4,
+                    t.min_total as agg5,
+                    t.count_all as agg6,
+                    cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                    from (
+                        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                        sum(o_totalprice) as sum_total, 
+                        max(o_totalprice) as max_total, 
+                        min(o_totalprice) as min_total, 
+                        count(*) as count_all, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                        from lineitem_1
+                        inner join orders_1
+                        on lineitem_1.l_orderkey = orders_1.o_orderkey
+                        where lineitem_1.l_shipdate >= "2023-10-17"
+                        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                    ) as t
+                    inner join partsupp_1
+                    on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                    where partsupp_1.ps_suppkey > 1
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+                ) as t2
+                on t1.l_orderkey = t2.l_orderkey
+                where t1.l_orderkey > 1
+                group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+            ) as t1
+            left join (
+                select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+                from (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                    t.agg1 as agg1, 
+                    t.sum_total as agg3,
+                    t.max_total as agg4,
+                    t.min_total as agg5,
+                    t.count_all as agg6,
+                    cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                    from (
+                        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                        sum(o_totalprice) as sum_total, 
+                        max(o_totalprice) as max_total, 
+                        min(o_totalprice) as min_total, 
+                        count(*) as count_all, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                        from lineitem_1
+                        inner join orders_1
+                        on lineitem_1.l_orderkey = orders_1.o_orderkey
+                        where lineitem_1.l_shipdate >= "2023-10-17"
+                        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                    ) as t
+                    inner join partsupp_1
+                    on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                    where partsupp_1.ps_suppkey > 1
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+                ) as t1
+                left join (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                    t.agg1 as agg1, 
+                    t.sum_total as agg3,
+                    t.max_total as agg4,
+                    t.min_total as agg5,
+                    t.count_all as agg6,
+                    cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                    from (
+                        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                        sum(o_totalprice) as sum_total, 
+                        max(o_totalprice) as max_total, 
+                        min(o_totalprice) as min_total, 
+                        count(*) as count_all, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                        from lineitem_1
+                        inner join orders_1
+                        on lineitem_1.l_orderkey = orders_1.o_orderkey
+                        where lineitem_1.l_shipdate >= "2023-10-17"
+                        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                    ) as t
+                    inner join partsupp_1
+                    on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                    where partsupp_1.ps_suppkey > 1
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+                ) as t2
+                on t1.l_orderkey = t2.l_orderkey
+                where t1.l_orderkey > 1
+                group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+            ) as t2
+            on t1.l_orderkey = t2.l_orderkey
+            group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        ) as t1
+        left join (
+            select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+            from (
+                select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+                from (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                    t.agg1 as agg1, 
+                    t.sum_total as agg3,
+                    t.max_total as agg4,
+                    t.min_total as agg5,
+                    t.count_all as agg6,
+                    cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                    from (
+                        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                        sum(o_totalprice) as sum_total, 
+                        max(o_totalprice) as max_total, 
+                        min(o_totalprice) as min_total, 
+                        count(*) as count_all, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                        from lineitem_1
+                        inner join orders_1
+                        on lineitem_1.l_orderkey = orders_1.o_orderkey
+                        where lineitem_1.l_shipdate >= "2023-10-17"
+                        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                    ) as t
+                    inner join partsupp_1
+                    on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                    where partsupp_1.ps_suppkey > 1
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+                ) as t1
+                left join (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                    t.agg1 as agg1, 
+                    t.sum_total as agg3,
+                    t.max_total as agg4,
+                    t.min_total as agg5,
+                    t.count_all as agg6,
+                    cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                    from (
+                        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                        sum(o_totalprice) as sum_total, 
+                        max(o_totalprice) as max_total, 
+                        min(o_totalprice) as min_total, 
+                        count(*) as count_all, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                        from lineitem_1
+                        inner join orders_1
+                        on lineitem_1.l_orderkey = orders_1.o_orderkey
+                        where lineitem_1.l_shipdate >= "2023-10-17"
+                        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                    ) as t
+                    inner join partsupp_1
+                    on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                    where partsupp_1.ps_suppkey > 1
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+                ) as t2
+                on t1.l_orderkey = t2.l_orderkey
+                where t1.l_orderkey > 1
+                group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+            ) as t1
+            left join (
+                select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 
+                from (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                    t.agg1 as agg1, 
+                    t.sum_total as agg3,
+                    t.max_total as agg4,
+                    t.min_total as agg5,
+                    t.count_all as agg6,
+                    cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                    from (
+                        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                        sum(o_totalprice) as sum_total, 
+                        max(o_totalprice) as max_total, 
+                        min(o_totalprice) as min_total, 
+                        count(*) as count_all, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                        from lineitem_1
+                        inner join orders_1
+                        on lineitem_1.l_orderkey = orders_1.o_orderkey
+                        where lineitem_1.l_shipdate >= "2023-10-17"
+                        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                    ) as t
+                    inner join partsupp_1
+                    on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                    where partsupp_1.ps_suppkey > 1
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+                ) as t1
+                left join (
+                    select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, 
+                    t.agg1 as agg1, 
+                    t.sum_total as agg3,
+                    t.max_total as agg4,
+                    t.min_total as agg5,
+                    t.count_all as agg6,
+                    cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2
+                    from (
+                        select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1,
+                        sum(o_totalprice) as sum_total, 
+                        max(o_totalprice) as max_total, 
+                        min(o_totalprice) as min_total, 
+                        count(*) as count_all, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+                        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+                        from lineitem_1
+                        inner join orders_1
+                        on lineitem_1.l_orderkey = orders_1.o_orderkey
+                        where lineitem_1.l_shipdate >= "2023-10-17"
+                        group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey
+                    ) as t
+                    inner join partsupp_1
+                    on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey
+                    where partsupp_1.ps_suppkey > 1
+                    group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6
+                ) as t2
+                on t1.l_orderkey = t2.l_orderkey
+                where t1.l_orderkey > 1
+                group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+            ) as t2
+            on t1.l_orderkey = t2.l_orderkey
+            group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        ) as t2
+        on t1.l_orderkey = t2.l_orderkey
+        group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6
+        """
+
+    explain {
+        sql("${sql_2}")
+        contains "${mv_2}(${mv_2})"
+    }
+    compare_res(sql_2 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13")
+
+    explain {
+        sql("${sql_3}")
+        contains "${mv_3}(${mv_3})"
+    }
+    compare_res(sql_3 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13")
+
+    explain {
+        sql("${sql_4}")
+        contains "${mv_4}(${mv_4})"
+    }
+    compare_res(sql_4 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13")
+
+    explain {
+        sql("${sql_5}")
+        contains "${mv_5}(${mv_5})"
+    }
+    compare_res(sql_5 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13")
+
+
+
+
+}
diff --git a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy
index e799c01fff9..1d34e9617da 100644
--- a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy
+++ b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy
@@ -166,12 +166,29 @@ suite("partition_mv_rewrite") {
     // wait partition is invalid
     sleep(5000)
     // only can use valid partition
+    sql "SET enable_materialized_view_union_rewrite=false"
+    // Test query all partition when disable enable_materialized_view_union_rewrite
+    order_qt_query_all_direct_before "${all_partition_sql}"
     explain {
         sql("${all_partition_sql}")
         notContains("${mv_name}(${mv_name})")
     }
+    order_qt_query_all_direct_after "${all_partition_sql}"
+
+    // Test query part partition when disable enable_materialized_view_union_rewrite
+    order_qt_query_partition_before "${partition_sql}"
     explain {
         sql("${partition_sql}")
         contains("${mv_name}(${mv_name})")
     }
+    order_qt_query_partition_after "${partition_sql}"
+
+    // Test query part partition when enable enable_materialized_view_union_rewrite
+    sql "SET enable_materialized_view_union_rewrite=true"
+    order_qt_query_all_before "${all_partition_sql}"
+    explain {
+        sql("${all_partition_sql}")
+        contains("${mv_name}(${mv_name})")
+    }
+    order_qt_query_all_after "${all_partition_sql}"
 }
diff --git a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy
similarity index 66%
copy from regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy
copy to regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy
index e799c01fff9..cfdd327c0d2 100644
--- a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy
+++ b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("partition_mv_rewrite") {
+suite ("partition_curd_union_rewrite") {
     String db = context.config.getDbNameByFile(context.file)
     sql "use ${db}"
     sql "SET enable_nereids_planner=true"
@@ -42,7 +42,7 @@ suite("partition_mv_rewrite") {
     )
     DUPLICATE KEY(o_orderkey, o_custkey)
     PARTITION BY RANGE(o_orderdate)(
-    FROM ('2023-10-17') TO ('2023-10-20') INTERVAL 1 DAY
+    FROM ('2023-10-17') TO ('2023-11-01') INTERVAL 1 DAY
     )
     DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3
     PROPERTIES (
@@ -75,7 +75,7 @@ suite("partition_mv_rewrite") {
     )
     DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
     PARTITION BY RANGE(l_shipdate) 
-    (FROM ('2023-10-17') TO ('2023-10-20') INTERVAL 1 DAY)
+    (FROM ('2023-10-17') TO ('2023-11-01') INTERVAL 1 DAY)
     DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
     PROPERTIES (
       "replication_num" = "1"
@@ -106,7 +106,7 @@ suite("partition_mv_rewrite") {
     l_shipdate,
     o_orderdate,
     l_partkey,
-    l_suppkey;
+    l_suppkey
     """
 
     def all_partition_sql = """
@@ -117,7 +117,7 @@ suite("partition_mv_rewrite") {
     l_shipdate,
     o_orderdate,
     l_partkey,
-    l_suppkey;
+    l_suppkey
    """
 
 
@@ -130,7 +130,7 @@ suite("partition_mv_rewrite") {
     l_shipdate,
     o_orderdate,
     l_partkey,
-    l_suppkey;
+    l_suppkey
     """
 
     sql """DROP MATERIALIZED VIEW IF EXISTS mv_10086"""
@@ -145,33 +145,94 @@ suite("partition_mv_rewrite") {
         ${mv_def_sql}
         """
 
-    def mv_name = "mv_10086"
 
-    def job_name = getJobName(db, mv_name);
-    waitingMTMVTaskFinished(job_name)
+    def compare_res = { def stmt ->
+        sql "SET enable_materialized_view_rewrite=false"
+        def origin_res = sql stmt
+        logger.info("origin_res: " + origin_res)
+        sql "SET enable_materialized_view_rewrite=true"
+        def mv_origin_res = sql stmt
+        logger.info("mv_origin_res: " + mv_origin_res)
+        assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size()))
+        for (int row = 0; row < mv_origin_res.size(); row++) {
+            assertTrue(mv_origin_res[row].size() == origin_res[row].size())
+            for (int col = 0; col < mv_origin_res[row].size(); col++) {
+                assertTrue(mv_origin_res[row][col] == origin_res[row][col])
+            }
+        }
+    }
+
+    def mv_name = "mv_10086"
+    def order_by_stmt = " order by 1,2,3,4,5"
+    waitingMTMVTaskFinished(getJobName(db, mv_name))
 
+    // All partition is valid, test query and rewrite by materialized view
     explain {
         sql("${all_partition_sql}")
         contains("${mv_name}(${mv_name})")
     }
+    compare_res(all_partition_sql + order_by_stmt)
     explain {
         sql("${partition_sql}")
         contains("${mv_name}(${mv_name})")
     }
-    // partition is invalid, so can not use partition 2023-10-17 to rewrite
+    compare_res(partition_sql + order_by_stmt)
+
+    /*
+    // Part partition is invalid, test can not use partition 2023-10-17 to rewrite
     sql """
     insert into lineitem values 
     (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy');
     """
     // wait partition is invalid
     sleep(5000)
-    // only can use valid partition
     explain {
         sql("${all_partition_sql}")
-        notContains("${mv_name}(${mv_name})")
+        contains("${mv_name}(${mv_name})")
+    }
+    compare_res(all_partition_sql + order_by_stmt)
+    explain {
+        sql("${partition_sql}")
+        contains("${mv_name}(${mv_name})")
+    }
+    compare_res(partition_sql + order_by_stmt)
+
+    sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO"
+    waitingMTMVTaskFinished(getJobName(db, mv_name))
+    // Test when base table create partition
+    sql """
+    insert into lineitem values 
+    (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-21', '2023-10-21', '2023-10-21', 'a', 'b', 'yyyyyyyyy');
+    """
+    // Wait partition is invalid
+    sleep(5000)
+    explain {
+        sql("${all_partition_sql}")
+        contains("${mv_name}(${mv_name})")
+    }
+    compare_res(all_partition_sql + order_by_stmt)
+    explain {
+        sql("${partition_sql}")
+        contains("${mv_name}(${mv_name})")
+    }
+    compare_res(partition_sql + order_by_stmt)
+
+    // Test when base table delete partition test
+    sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO"
+    waitingMTMVTaskFinished(getJobName(db, mv_name))
+    sql """ ALTER TABLE lineitem DROP PARTITION IF EXISTS p_20231021 FORCE;
+    """
+    // Wait partition is invalid
+    sleep(3000)
+    explain {
+        sql("${all_partition_sql}")
+        contains("${mv_name}(${mv_name})")
     }
+    compare_res(all_partition_sql + order_by_stmt)
     explain {
         sql("${partition_sql}")
         contains("${mv_name}(${mv_name})")
     }
+    compare_res(partition_sql + order_by_stmt)
+     */
 }
diff --git a/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy
new file mode 100644
index 00000000000..1e474abc8ff
--- /dev/null
+++ b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite ("usercase_union_rewrite") {
+    String db = context.config.getDbNameByFile(context.file)
+    sql "use ${db}"
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+    sql "SET enable_materialized_view_rewrite=true"
+    sql "SET enable_nereids_timeout = false"
+
+    sql """
+    drop table if exists orders_user
+    """
+
+    sql """CREATE TABLE `orders_user` (
+      `o_orderkey` BIGINT NULL,
+      `o_custkey` INT NULL,
+      `o_orderstatus` VARCHAR(1) NULL,
+      `o_totalprice` DECIMAL(15, 2)  NULL,
+      `o_orderpriority` VARCHAR(15) NULL,
+      `o_clerk` VARCHAR(15) NULL,
+      `o_shippriority` INT NULL,
+      `o_comment` VARCHAR(79) NULL,
+      `o_orderdate` DATE not NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`o_orderkey`, `o_custkey`)
+    COMMENT 'OLAP'
+    auto partition by range (date_trunc(`o_orderdate`, 'day')) ()
+    DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );"""
+
+    sql """
+    drop table if exists lineitem_user
+    """
+
+    sql """CREATE TABLE `lineitem_user` (
+      `l_orderkey` BIGINT NULL,
+      `l_linenumber` INT NULL,
+      `l_partkey` INT NULL,
+      `l_suppkey` INT NULL,
+      `l_quantity` DECIMAL(15, 2) NULL,
+      `l_extendedprice` DECIMAL(15, 2) NULL,
+      `l_discount` DECIMAL(15, 2) NULL,
+      `l_tax` DECIMAL(15, 2) NULL,
+      `l_returnflag` VARCHAR(1) NULL,
+      `l_linestatus` VARCHAR(1) NULL,
+      `l_commitdate` DATE NULL,
+      `l_receiptdate` DATE NULL,
+      `l_shipinstruct` VARCHAR(25) NULL,
+      `l_shipmode` VARCHAR(10) NULL,
+      `l_comment` VARCHAR(44) NULL,
+      `l_shipdate` DATE not NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey )
+    COMMENT 'OLAP'
+    auto partition by range (date_trunc(`l_shipdate`, 'day')) ()
+    DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );"""
+
+    sql """
+    insert into orders_user values 
+    (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'),
+    (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'),
+    (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'),
+    (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); 
+    """
+
+    sql """
+    insert into lineitem_user values 
+    (2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'),
+    (3, 1, 1, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'),
+    (1, 3, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17');
+    """
+
+    sql """analyze table orders_user with sync;"""
+    sql """analyze table lineitem_user with sync;"""
+
+    def create_mv_orders = { mv_name, mv_sql ->
+        sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};"""
+        sql """DROP TABLE IF EXISTS ${mv_name}"""
+        sql"""
+        CREATE MATERIALIZED VIEW ${mv_name} 
+        BUILD IMMEDIATE REFRESH AUTO ON MANUAL 
+        partition by(o_orderdate) 
+        DISTRIBUTED BY RANDOM BUCKETS 2 
+        PROPERTIES ('replication_num' = '1') 
+        AS  
+        ${mv_sql}
+        """
+    }
+
+    def compare_res = { def stmt ->
+        sql "SET enable_materialized_view_rewrite=false"
+        def origin_res = sql stmt
+        logger.info("origin_res: " + origin_res)
+        sql "SET enable_materialized_view_rewrite=true"
+        def mv_origin_res = sql stmt
+        logger.info("mv_origin_res: " + mv_origin_res)
+        assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size()))
+        for (int row = 0; row < mv_origin_res.size(); row++) {
+            assertTrue(mv_origin_res[row].size() == origin_res[row].size())
+            for (int col = 0; col < mv_origin_res[row].size(); col++) {
+                assertTrue(mv_origin_res[row][col] == origin_res[row][col])
+            }
+        }
+    }
+
+    def mv_name = "mv_usercase"
+    def mv_stmt = """select o_orderdatE, o_shippriority, o_comment, o_orderdate, 
+        sum(o_totalprice) as sum_total, 
+        max(o_totalpricE) as max_total, 
+        min(o_totalprice) as min_total, 
+        count(*) as count_all, 
+        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+        from (select * from orders_user) as t1
+        group by 
+        o_orderdatE, 
+        o_shippriority, 
+        o_comment,
+        o_orderdate
+        """
+    create_mv_orders(mv_name, mv_stmt)
+    def job_name_1 = getJobName(db, mv_name)
+    waitingMTMVTaskFinished(job_name_1)
+
+    def query_stmt = """select o_orderdatE, o_shippriority, o_comment, o_orderdate, 
+        sum(o_totalprice) as sum_total, 
+        max(o_totalpricE) as max_total, 
+        min(o_totalprice) as min_total, 
+        count(*) as count_all, 
+        bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, 
+        bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 
+        from (select * from orders_user) as t1
+        group by 
+        o_orderdatE, 
+        o_shippriority, 
+        o_comment,
+        o_orderdate
+        """
+    explain {
+        sql("${query_stmt}")
+        contains "${mv_name}(${mv_name})"
+    }
+    compare_res(query_stmt + " order by 1,2,3,4,5,6,7,8")
+
+    sql """insert into orders_user values (5, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19');"""
+    sql "SET enable_materialized_view_union_rewrite=true"
+    sleep(10 * 1000)
+    explain {
+        sql("${query_stmt}")
+        contains "${mv_name}(${mv_name})"
+    }
+    compare_res(query_stmt + " order by 1,2,3,4,5,6,7,8")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org