You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/13 22:41:55 UTC
[arrow] branch master updated: ARROW-9714: [Rust] [DataFusion]
Implement type coercion rule for limit and sort
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7efc4f3 ARROW-9714: [Rust] [DataFusion] Implement type coercion rule for limit and sort
7efc4f3 is described below
commit 7efc4f336a99c90127c3d259cfa585e4f5a07c0c
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Aug 13 16:41:08 2020 -0600
ARROW-9714: [Rust] [DataFusion] Implement type coercion rule for limit and sort
Closes #7949 from andygrove/ARROW-9714
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/datafusion/src/optimizer/type_coercion.rs | 50 ++++++++++++++++++++++++--
1 file changed, 47 insertions(+), 3 deletions(-)
diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs
index 78fcf52..676e712 100644
--- a/rust/datafusion/src/optimizer/type_coercion.rs
+++ b/rust/datafusion/src/optimizer/type_coercion.rs
@@ -166,13 +166,22 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
self.rewrite_expr_list(aggr_expr, input.schema())?,
)?
.build(),
+ LogicalPlan::Limit { n, input, .. } => {
+ LogicalPlanBuilder::from(&self.optimize(input)?)
+ .limit(*n)?
+ .build()
+ }
+ LogicalPlan::Sort { input, expr, .. } => {
+ LogicalPlanBuilder::from(&self.optimize(input)?)
+ .sort(self.rewrite_expr_list(expr, input.schema())?)?
+ .build()
+ }
+ // the following rules do not have inputs and do not need to be re-written
LogicalPlan::TableScan { .. } => Ok(plan.clone()),
LogicalPlan::InMemoryScan { .. } => Ok(plan.clone()),
LogicalPlan::ParquetScan { .. } => Ok(plan.clone()),
LogicalPlan::CsvScan { .. } => Ok(plan.clone()),
LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
- LogicalPlan::Limit { .. } => Ok(plan.clone()),
- LogicalPlan::Sort { .. } => Ok(plan.clone()),
LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
}
}
@@ -183,11 +192,46 @@ mod tests {
use super::*;
use crate::execution::context::ExecutionContext;
use crate::execution::physical_plan::csv::CsvReadOptions;
- use crate::logicalplan::{col, Operator};
+ use crate::logicalplan::{aggregate_expr, col, lit, Operator};
use crate::test::arrow_testdata_path;
use arrow::datatypes::{DataType, Field, Schema};
#[test]
+ fn test_all_operators() -> Result<()> {
+ let testdata = arrow_testdata_path();
+ let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+
+ let options = CsvReadOptions::new().schema_infer_max_records(100);
+ let plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
+ // filter clause needs the type coercion rule applied
+ .filter(col("c7").lt(&lit(5_u8)))?
+ .project(vec![col("c1"), col("c2")])?
+ .aggregate(
+ vec![col("c1")],
+ vec![aggregate_expr("SUM", col("c2"), DataType::Int64)],
+ )?
+ .sort(vec![col("c1")])?
+ .limit(10)?
+ .build()?;
+
+ let scalar_functions = HashMap::new();
+ let mut rule = TypeCoercionRule::new(&scalar_functions);
+ let plan = rule.optimize(&plan)?;
+
+ // check that the filter had a cast added
+ let plan_str = format!("{:?}", plan);
+ println!("{}", plan_str);
+ let expected_plan_str = "Limit: 10
+ Sort: #c1
+ Aggregate: groupBy=[[#c1]], aggr=[[SUM(#c2)]]
+ Projection: #c1, #c2
+ Selection: #c7 Lt CAST(UInt8(5) AS Int64)";
+ assert!(plan_str.starts_with(expected_plan_str));
+
+ Ok(())
+ }
+
+ #[test]
fn test_with_csv_plan() -> Result<()> {
let testdata = arrow_testdata_path();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);