You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/03/29 18:38:38 UTC

[arrow] branch master updated: ARROW-8259: [Rust] [DataFusion] ProjectionPushDown now respects LIMIT

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new fd51e9d  ARROW-8259: [Rust] [DataFusion] ProjectionPushDown now respects LIMIT
fd51e9d is described below

commit fd51e9d818036f57bd2da213dfe20c35597f1add
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Mar 29 12:38:22 2020 -0600

    ARROW-8259: [Rust] [DataFusion] ProjectionPushDown now respects LIMIT
    
    ProjectionPushDown now respects LIMIT.
    
    I had to update the tests because they were not referencing any columns in the input file and that caused the new arrow reader to fail. I will file a separate issue to add validation for this case.
    
    Closes #6753 from andygrove/ARROW-8259
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 .../src/optimizer/projection_push_down.rs          | 39 +++++++++++++++++-----
 rust/datafusion/tests/sql.rs                       |  6 ++--
 2 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index 6017b90..1161cac 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -151,15 +151,13 @@ impl ProjectionPushDown {
                     projection: Some(projection),
                 })
             }
-            LogicalPlan::Limit {
-                expr,
-                input,
-                schema,
-            } => Ok(LogicalPlan::Limit {
-                expr: expr.clone(),
-                input: input.clone(),
-                schema: schema.clone(),
-            }),
+            LogicalPlan::Limit { expr, input, .. } => {
+                // Note that limit expressions are scalar values so there is no need to
+                // rewrite them but we do need to optimize the input to the limit plan
+                LogicalPlanBuilder::from(&self.optimize_plan(&input, accum, mapping)?)
+                    .limit(expr.clone())?
+                    .build()
+            }
             LogicalPlan::CreateExternalTable {
                 schema,
                 name,
@@ -255,6 +253,7 @@ mod tests {
 
     use super::*;
     use crate::logicalplan::Expr::*;
+    use crate::logicalplan::ScalarValue;
     use crate::test::*;
     use arrow::datatypes::DataType;
     use std::sync::Arc;
@@ -348,6 +347,28 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn table_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        assert_eq!(3, table_scan.schema().fields().len());
+        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![Column(2), Column(0)])?
+            .limit(Expr::Literal(ScalarValue::UInt32(5)))?
+            .build()?;
+
+        assert_fields_eq(&plan, vec!["c", "a"]);
+
+        let expected = "Limit: UInt32(5)\
+        \n  Projection: #1, #0\
+        \n    TableScan: test projection=Some([0, 2])";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let optimized_plan = optimize(plan).expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index c45af09..e10a9ce 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -253,9 +253,9 @@ fn csv_query_cast_literal() {
 fn csv_query_limit() {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx);
-    let sql = "SELECT 0 FROM aggregate_test_100 LIMIT 2";
+    let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 2";
     let actual = execute(&mut ctx, sql).join("\n");
-    let expected = "0\n0".to_string();
+    let expected = "\"c\"\n\"d\"".to_string();
     assert_eq!(expected, actual);
 }
 
@@ -283,7 +283,7 @@ fn csv_query_limit_with_same_nbr_of_rows() {
 fn csv_query_limit_zero() {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx);
-    let sql = "SELECT 0 FROM aggregate_test_100 LIMIT 0";
+    let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 0";
     let actual = execute(&mut ctx, sql).join("\n");
     let expected = "".to_string();
     assert_eq!(expected, actual);