You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/22 10:57:17 UTC

[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4324: reimplment `eliminate_limit` to remove `global-state`.

jackwener commented on code in PR #4324:
URL: https://github.com/apache/arrow-datafusion/pull/4324#discussion_r1029180997


##########
datafusion/optimizer/src/eliminate_limit.rs:
##########
@@ -157,180 +80,148 @@ mod tests {
         sum,
     };
 
-    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
-        let rule = EliminateLimit::new();
-        let optimized_plan = rule
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
+        let optimized_plan = EliminateLimit::new()
+            .optimize(plan, &mut OptimizerConfig::new())
+            .expect("failed to optimize plan");
+        let formatted_plan = format!("{:?}", optimized_plan);
+        assert_eq!(formatted_plan, expected);
+        assert_eq!(plan.schema(), optimized_plan.schema());
+        Ok(())
+    }
+
+    fn assert_optimized_plan_eq_with_pushdown(
+        plan: &LogicalPlan,
+        expected: &str,
+    ) -> Result<()> {
+        let optimized_plan = LimitPushDown::new()
             .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
+        let optimized_plan = EliminateLimit::new()
+            .optimize(&optimized_plan, &mut OptimizerConfig::new())
+            .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
         assert_eq!(plan.schema(), optimized_plan.schema());
+        Ok(())
     }
 
     #[test]
-    fn limit_0_root() {
+    fn limit_0_root() -> Result<()> {
         let table_scan = test_table_scan().unwrap();
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(0))
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(0))?
+            .build()?;
         // No aggregate / scan / limit
         let expected = "EmptyRelation";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn limit_0_nested() {
-        let table_scan = test_table_scan().unwrap();
+    fn limit_0_nested() -> Result<()> {
+        let table_scan = test_table_scan()?;
         let plan1 = LogicalPlanBuilder::from(table_scan.clone())
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .build()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(0))
-            .unwrap()
-            .union(plan1)
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(0))?
+            .union(plan1)?
+            .build()?;
 
         // Left side is removed
         let expected = "Union\
             \n  EmptyRelation\
             \n  Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
             \n    TableScan: test";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn limit_fetch_with_ancestor_limit_skip() {
-        let table_scan = test_table_scan().unwrap();
+    fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> {
+        let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(2))
-            .unwrap()
-            .limit(2, None)
-            .unwrap()
-            .build()
-            .unwrap();
-
-        // No aggregate / scan / limit
-        let expected = "Limit: skip=2, fetch=None\
-            \n  EmptyRelation";
-        assert_optimized_plan_eq(&plan, expected);
-    }
-
-    #[test]
-    fn multi_limit_offset_sort_eliminate() {
-        let table_scan = test_table_scan().unwrap();
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(2))
-            .unwrap()
-            .sort(vec![col("a")])
-            .unwrap()
-            .limit(2, Some(1))
-            .unwrap()
-            .build()
-            .unwrap();
-
-        let expected = "Limit: skip=2, fetch=1\
-            \n  Sort: test.a\
-            \n    EmptyRelation";
-        assert_optimized_plan_eq(&plan, expected);
-    }
-
-    #[test]
-    fn limit_fetch_with_ancestor_limit_fetch() {
-        let table_scan = test_table_scan().unwrap();
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(2))
-            .unwrap()
-            .sort(vec![col("a")])
-            .unwrap()
-            .limit(0, Some(1))
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(2))?
+            .sort(vec![col("a")])?
+            .limit(0, Some(1))?
+            .build()?;
 
         let expected = "Limit: skip=0, fetch=1\
             \n  Sort: test.a\
             \n    Limit: skip=0, fetch=2\
             \n      Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
             \n        TableScan: test";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn limit_with_ancestor_limit() {
-        let table_scan = test_table_scan().unwrap();
+    fn limit_join_with_ancestor_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let table_scan_inner = test_table_scan_with_name("test1")?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(2, Some(1))
-            .unwrap()
-            .sort(vec![col("a")])
-            .unwrap()
-            .limit(3, Some(1))
-            .unwrap()
-            .build()
-            .unwrap();
-
-        let expected = "Limit: skip=3, fetch=1\
-            \n  Sort: test.a\
-            \n    EmptyRelation";
-        assert_optimized_plan_eq(&plan, expected);
-    }
-
-    #[test]
-    fn limit_join_with_ancestor_limit() {
-        let table_scan = test_table_scan().unwrap();
-        let table_scan_inner = test_table_scan_with_name("test1").unwrap();
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .limit(2, Some(1))
-            .unwrap()
+            .limit(2, Some(1))?
             .join_using(
                 &table_scan_inner,
                 JoinType::Inner,
                 vec![Column::from_name("a".to_string())],
-            )
-            .unwrap()
-            .limit(3, Some(1))
-            .unwrap()
-            .build()
-            .unwrap();
+            )?
+            .limit(3, Some(1))?
+            .build()?;
 
         let expected = "Limit: skip=3, fetch=1\
             \n  Inner Join: Using test.a = test1.a\
             \n    Limit: skip=2, fetch=1\
             \n      TableScan: test\
             \n    TableScan: test1";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn remove_zero_offset() {
-        let table_scan = test_table_scan().unwrap();
+    fn remove_zero_offset() -> Result<()> {
+        let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, None)
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, None)?
+            .build()?;
 
         let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
             \n  TableScan: test";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
+    }
+
+    #[test]
+    fn limit_fetch_with_ancestor_limit_skip() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(2))?
+            .limit(2, None)?
+            .build()?;
+
+        // No aggregate / scan / limit
+        let expected = "EmptyRelation";
+        assert_optimized_plan_eq_with_pushdown(&plan, expected)
+    }
+
+    #[test]
+    fn limit_with_ancestor_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(2, Some(1))?
+            .sort(vec![col("a")])?
+            .limit(3, Some(1))?
+            .build()?;
+
+        // After remove global-state, we don't record the parent <skip, fetch>
+        // So, bottom don't know parent info, so can't eliminate.
+        let expected = "Limit: skip=3, fetch=1\
+        \n  Sort: test.a, fetch=4\
+        \n    Limit: skip=2, fetch=1\
+        \n      Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
+        \n        TableScan: test";
+        assert_optimized_plan_eq_with_pushdown(&plan, expected)

Review Comment:
   Some case need to record information cross plannode.
   Exist regression, but it's trivial.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org