You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/13 22:41:55 UTC

[arrow] branch master updated: ARROW-9714: [Rust] [DataFusion] Implement type coercion rule for limit and sort

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7efc4f3  ARROW-9714: [Rust] [DataFusion] Implement type coercion rule for limit and sort
7efc4f3 is described below

commit 7efc4f336a99c90127c3d259cfa585e4f5a07c0c
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Aug 13 16:41:08 2020 -0600

    ARROW-9714: [Rust] [DataFusion] Implement type coercion rule for limit and sort
    
    Closes #7949 from andygrove/ARROW-9714
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/src/optimizer/type_coercion.rs | 50 ++++++++++++++++++++++++--
 1 file changed, 47 insertions(+), 3 deletions(-)

diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs
index 78fcf52..676e712 100644
--- a/rust/datafusion/src/optimizer/type_coercion.rs
+++ b/rust/datafusion/src/optimizer/type_coercion.rs
@@ -166,13 +166,22 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
                     self.rewrite_expr_list(aggr_expr, input.schema())?,
                 )?
                 .build(),
+            LogicalPlan::Limit { n, input, .. } => {
+                LogicalPlanBuilder::from(&self.optimize(input)?)
+                    .limit(*n)?
+                    .build()
+            }
+            LogicalPlan::Sort { input, expr, .. } => {
+                LogicalPlanBuilder::from(&self.optimize(input)?)
+                    .sort(self.rewrite_expr_list(expr, input.schema())?)?
+                    .build()
+            }
+            // the following rules do not have inputs and do not need to be re-written
             LogicalPlan::TableScan { .. } => Ok(plan.clone()),
             LogicalPlan::InMemoryScan { .. } => Ok(plan.clone()),
             LogicalPlan::ParquetScan { .. } => Ok(plan.clone()),
             LogicalPlan::CsvScan { .. } => Ok(plan.clone()),
             LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
-            LogicalPlan::Limit { .. } => Ok(plan.clone()),
-            LogicalPlan::Sort { .. } => Ok(plan.clone()),
             LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
         }
     }
@@ -183,11 +192,46 @@ mod tests {
     use super::*;
     use crate::execution::context::ExecutionContext;
     use crate::execution::physical_plan::csv::CsvReadOptions;
-    use crate::logicalplan::{col, Operator};
+    use crate::logicalplan::{aggregate_expr, col, lit, Operator};
     use crate::test::arrow_testdata_path;
     use arrow::datatypes::{DataType, Field, Schema};
 
     #[test]
+    fn test_all_operators() -> Result<()> {
+        let testdata = arrow_testdata_path();
+        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+
+        let options = CsvReadOptions::new().schema_infer_max_records(100);
+        let plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
+            // filter clause needs the type coercion rule applied
+            .filter(col("c7").lt(&lit(5_u8)))?
+            .project(vec![col("c1"), col("c2")])?
+            .aggregate(
+                vec![col("c1")],
+                vec![aggregate_expr("SUM", col("c2"), DataType::Int64)],
+            )?
+            .sort(vec![col("c1")])?
+            .limit(10)?
+            .build()?;
+
+        let scalar_functions = HashMap::new();
+        let mut rule = TypeCoercionRule::new(&scalar_functions);
+        let plan = rule.optimize(&plan)?;
+
+        // check that the filter had a cast added
+        let plan_str = format!("{:?}", plan);
+        println!("{}", plan_str);
+        let expected_plan_str = "Limit: 10
+  Sort: #c1
+    Aggregate: groupBy=[[#c1]], aggr=[[SUM(#c2)]]
+      Projection: #c1, #c2
+        Selection: #c7 Lt CAST(UInt8(5) AS Int64)";
+        assert!(plan_str.starts_with(expected_plan_str));
+
+        Ok(())
+    }
+
+    #[test]
     fn test_with_csv_plan() -> Result<()> {
         let testdata = arrow_testdata_path();
         let path = format!("{}/csv/aggregate_test_100.csv", testdata);