You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/17 18:36:59 UTC

[arrow-datafusion] branch master updated: Support for OFFSET in LogicalPlan (#2521)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b1e3a521b Support for OFFSET in LogicalPlan (#2521)
b1e3a521b is described below

commit b1e3a521b7e0ad49c5430bc07bf3026aa2cbb231
Author: Jeremy Dyer <jd...@gmail.com>
AuthorDate: Tue May 17 14:36:54 2022 -0400

    Support for OFFSET in LogicalPlan (#2521)
    
    * Introduce support for OFFSET
    
    * lint fixes
    
    * Slightly modify existing test to include LIMIT and OFFSET
    
    * Uncomment accidental comment out for pre-commit script
    
    * OFFSET should come before LIMIT
    
    * Check for OFFSET <= 0 and add more tests
---
 ballista/rust/core/proto/ballista.proto            |  6 ++
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 25 +++++++-
 datafusion/core/src/logical_plan/builder.rs        | 20 +++++--
 datafusion/core/src/logical_plan/mod.rs            |  2 +-
 datafusion/core/src/logical_plan/plan.rs           |  2 +-
 .../core/src/optimizer/common_subexpr_eliminate.rs |  1 +
 .../core/src/optimizer/projection_push_down.rs     |  1 +
 datafusion/core/src/optimizer/utils.rs             |  7 ++-
 datafusion/core/src/physical_plan/planner.rs       |  5 ++
 datafusion/core/src/sql/planner.rs                 | 69 ++++++++++++++++++++--
 datafusion/expr/src/logical_plan/mod.rs            |  5 +-
 datafusion/expr/src/logical_plan/plan.rs           | 19 ++++++
 12 files changed, 148 insertions(+), 14 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index e7821dc10..1ceb412f1 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -54,6 +54,7 @@ message LogicalPlanNode {
     CreateCatalogNode create_catalog = 20;
     SubqueryAliasNode subquery_alias = 21;
     CreateViewNode create_view = 22;
+    OffsetNode offset = 23;
   }
 }
 
@@ -251,6 +252,11 @@ message LimitNode {
   uint32 limit = 2;
 }
 
+message OffsetNode {
+  LogicalPlanNode input = 1;
+  uint32 offset = 2;
+}
+
 message SelectionExecNode {
   datafusion.LogicalExprNode expr = 1;
 }
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 1dd61a1e9..070dad98a 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -34,7 +34,7 @@ use datafusion::logical_plan::plan::{
 use datafusion::logical_plan::{
     source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
     CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder,
-    Repartition, TableScan, Values,
+    Offset, Repartition, TableScan, Values,
 };
 use datafusion::prelude::SessionContext;
 
@@ -405,6 +405,14 @@ impl AsLogicalPlan for LogicalPlanNode {
                     .build()
                     .map_err(|e| e.into())
             }
+            LogicalPlanType::Offset(offset) => {
+                let input: LogicalPlan =
+                    into_logical_plan!(offset.input, ctx, extension_codec)?;
+                LogicalPlanBuilder::from(input)
+                    .offset(offset.offset as usize)?
+                    .build()
+                    .map_err(|e| e.into())
+            }
             LogicalPlanType::Join(join) => {
                 let left_keys: Vec<Column> =
                     join.left_join_column.iter().map(|i| i.into()).collect();
@@ -759,6 +767,21 @@ impl AsLogicalPlan for LogicalPlanNode {
                     ))),
                 })
             }
+            LogicalPlan::Offset(Offset { input, offset }) => {
+                let input: protobuf::LogicalPlanNode =
+                    protobuf::LogicalPlanNode::try_from_logical_plan(
+                        input.as_ref(),
+                        extension_codec,
+                    )?;
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::Offset(Box::new(
+                        protobuf::OffsetNode {
+                            input: Some(Box::new(input)),
+                            offset: *offset as u32,
+                        },
+                    ))),
+                })
+            }
             LogicalPlan::Sort(Sort { input, expr }) => {
                 let input: protobuf::LogicalPlanNode =
                     protobuf::LogicalPlanNode::try_from_logical_plan(
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index 246eeb756..9e8f17815 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -42,7 +42,7 @@ use crate::logical_plan::expr::exprlist_to_fields;
 use crate::logical_plan::{
     columnize_expr, normalize_col, normalize_cols, provider_as_source,
     rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit,
-    Partitioning, Repartition, Values,
+    Offset, Partitioning, Repartition, Values,
 };
 use crate::sql::utils::group_window_expr_by_sort_keys;
 use datafusion_common::ToDFSchema;
@@ -312,6 +312,14 @@ impl LogicalPlanBuilder {
         })))
     }
 
+    /// Apply an offset
+    pub fn offset(&self, offset: usize) -> Result<Self> {
+        Ok(Self::from(LogicalPlan::Offset(Offset {
+            offset,
+            input: Arc::new(self.plan.clone()),
+        })))
+    }
+
     /// Apply an alias
     pub fn alias(&self, alias: &str) -> Result<Self> {
         let schema: Schema = self.schema().as_ref().clone().into();
@@ -991,11 +999,15 @@ mod tests {
             vec![sum(col("salary")).alias("total_salary")],
         )?
         .project(vec![col("state"), col("total_salary")])?
+        .limit(10)?
+        .offset(2)?
         .build()?;
 
-        let expected = "Projection: #employee_csv.state, #total_salary\
-        \n  Aggregate: groupBy=[[#employee_csv.state]], aggr=[[SUM(#employee_csv.salary) AS total_salary]]\
-        \n    TableScan: employee_csv projection=Some([3, 4])";
+        let expected = "Offset: 2\
+        \n  Limit: 10\
+        \n    Projection: #employee_csv.state, #total_salary\
+        \n      Aggregate: groupBy=[[#employee_csv.state]], aggr=[[SUM(#employee_csv.salary) AS total_salary]]\
+        \n        TableScan: employee_csv projection=Some([3, 4])";
 
         assert_eq!(expected, format!("{:?}", plan));
 
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index d4397c4b0..e9496c8f2 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -61,7 +61,7 @@ pub use plan::{provider_as_source, source_as_provider};
 pub use plan::{
     CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
     CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType,
-    Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
+    Limit, LogicalPlan, Offset, Partitioning, PlanType, PlanVisitor, Repartition,
     StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
     UserDefinedLogicalNode, Values,
 };
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index e1adb4939..f12981707 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -27,7 +27,7 @@ pub use crate::logical_expr::{
         Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
         CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain,
         Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
-        Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
+        Offset, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
         StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
         UserDefinedLogicalNode, Values, Window,
     },
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index defb42289..af0eea663 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -220,6 +220,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
         | LogicalPlan::Subquery(_)
         | LogicalPlan::SubqueryAlias(_)
         | LogicalPlan::Limit(_)
+        | LogicalPlan::Offset(_)
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::Explain { .. }
         | LogicalPlan::Analyze { .. }
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 6d5e80b1e..a1bd6efe1 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -469,6 +469,7 @@ fn optimize_plan(
         // all other nodes: Add any additional columns used by
         // expressions in this node to the list of required columns
         LogicalPlan::Limit(_)
+        | LogicalPlan::Offset(_)
         | LogicalPlan::Filter { .. }
         | LogicalPlan::Repartition(_)
         | LogicalPlan::EmptyRelation(_)
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index 1a1f9edce..96ba969c4 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -27,7 +27,8 @@ use datafusion_expr::logical_plan::{
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::{
     and, build_join_schema, CreateMemoryTable, CreateView, DFSchemaRef, Expr, Limit,
-    LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Repartition, Union, Values,
+    LogicalPlan, LogicalPlanBuilder, Offset, Operator, Partitioning, Repartition, Union,
+    Values,
 };
 use crate::prelude::lit;
 use crate::scalar::ScalarValue;
@@ -184,6 +185,10 @@ pub fn from_plan(
             n: *n,
             input: Arc::new(inputs[0].clone()),
         })),
+        LogicalPlan::Offset(Offset { offset, .. }) => Ok(LogicalPlan::Offset(Offset {
+            offset: *offset,
+            input: Arc::new(inputs[0].clone()),
+        })),
         LogicalPlan::CreateMemoryTable(CreateMemoryTable {
             name,
             if_not_exists,
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 77aee4102..af844cd1e 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -853,6 +853,11 @@ impl DefaultPhysicalPlanner {
 
                     Ok(Arc::new(GlobalLimitExec::new(input, limit)))
                 }
+                LogicalPlan::Offset(_) => {
+                    Err(DataFusionError::Internal(
+                        "Unsupported logical plan: OFFSET".to_string(),
+                    ))
+                }
                 LogicalPlan::CreateExternalTable(_) => {
                     // There is no default plan for "CREATE EXTERNAL
                     // TABLE" -- it must be handled at a higher level (so
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index cb02c3ed0..2518de78a 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -54,9 +54,10 @@ use datafusion_expr::expr::GroupingSet;
 use datafusion_expr::logical_plan::{Filter, Subquery};
 use sqlparser::ast::{
     BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
-    FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
-    Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor,
-    TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
+    FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName,
+    Offset as SQLOffset, Query, Select, SelectItem, SetExpr, SetOperator,
+    ShowStatementFilter, TableFactor, TableWithJoins, TrimWhereField, UnaryOperator,
+    Value, Values as SQLValues,
 };
 use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
 use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
@@ -293,6 +294,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         let plan = self.order_by(plan, query.order_by)?;
 
+        let plan: LogicalPlan = self.offset(plan, query.offset)?;
+
         self.limit(plan, query.limit)
     }
 
@@ -1259,6 +1262,39 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
     }
 
+    /// Wrap a plan in a offset
+    fn offset(
+        &self,
+        input: LogicalPlan,
+        offset: Option<SQLOffset>,
+    ) -> Result<LogicalPlan> {
+        match offset {
+            Some(offset_expr) => {
+                let offset = match self.sql_to_rex(
+                    offset_expr.value,
+                    input.schema(),
+                    &mut HashMap::new(),
+                )? {
+                    Expr::Literal(ScalarValue::Int64(Some(offset))) => {
+                        if offset < 0 {
+                            return Err(DataFusionError::Plan(format!(
+                                "Offset must be >= 0, '{}' was provided.",
+                                offset
+                            )));
+                        }
+                        Ok(offset as usize)
+                    }
+                    _ => Err(DataFusionError::Plan(
+                        "Unexpected expression in OFFSET clause".to_string(),
+                    )),
+                }?;
+
+                LogicalPlanBuilder::from(input).offset(offset)?.build()
+            }
+            _ => Ok(input),
+        }
+    }
+
     /// Wrap the logical in a sort
     fn order_by(
         &self,
@@ -4327,7 +4363,7 @@ mod tests {
     fn logical_plan(sql: &str) -> Result<LogicalPlan> {
         let planner = SqlToRel::new(&MockContextProvider {});
         let result = DFParser::parse_sql(sql);
-        let mut ast = result.unwrap();
+        let mut ast = result?;
         planner.statement_to_plan(ast.pop_front().unwrap())
     }
 
@@ -4784,6 +4820,31 @@ mod tests {
         quick_test(sql, expected);
     }
 
+    #[test]
+    fn test_offset_with_limit() {
+        let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
+        let expected = "Limit: 5\
+                                    \n  Offset: 0\
+                                    \n    Projection: #person.id\
+                                    \n      Filter: #person.id > Int64(100)\
+                                    \n        TableScan: person projection=None";
+        quick_test(sql, expected);
+
+        // Flip the order of LIMIT and OFFSET in the query. Plan should remain the same.
+        let sql = "SELECT id FROM person WHERE person.id > 100 OFFSET 0 LIMIT 5;";
+        quick_test(sql, expected);
+    }
+
+    #[test]
+    fn test_offset_no_limit() {
+        let sql = "SELECT id FROM person WHERE person.id > 100 OFFSET 5;";
+        let expected = "Offset: 5\
+                                    \n  Projection: #person.id\
+                                    \n    Filter: #person.id > Int64(100)\
+                                    \n      TableScan: person projection=None";
+        quick_test(sql, expected);
+    }
+
     fn assert_field_not_found(err: DataFusionError, name: &str) {
         match err {
             DataFusionError::SchemaError { .. } => {
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 3681ff14e..e9a90cd7f 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -23,8 +23,9 @@ pub use plan::{
     Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
     CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain,
     Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
-    Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, StringifiedPlan,
-    Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, Window,
+    Offset, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
+    StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
+    Values, Window,
 };
 
 pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index ab96dbe73..50ca75fdf 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -73,6 +73,8 @@ pub enum LogicalPlan {
     SubqueryAlias(SubqueryAlias),
     /// Produces the first `n` tuples from its input and discards the rest.
     Limit(Limit),
+    /// Adjusts the starting point at which the rest of the expressions begin to effect
+    Offset(Offset),
     /// Creates an external table.
     CreateExternalTable(CreateExternalTable),
     /// Creates an in memory table.
@@ -117,6 +119,7 @@ impl LogicalPlan {
             LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
             LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
             LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+            LogicalPlan::Offset(Offset { input, .. }) => input.schema(),
             LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
             LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
             LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
@@ -187,6 +190,7 @@ impl LogicalPlan {
             | LogicalPlan::Sort(Sort { input, .. })
             | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
             | LogicalPlan::CreateView(CreateView { input, .. })
+            | LogicalPlan::Offset(Offset { input, .. })
             | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
             LogicalPlan::DropTable(_) => vec![],
         }
@@ -235,6 +239,7 @@ impl LogicalPlan {
             | LogicalPlan::Subquery(_)
             | LogicalPlan::SubqueryAlias(_)
             | LogicalPlan::Limit(_)
+            | LogicalPlan::Offset(_)
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateMemoryTable(_)
             | LogicalPlan::CreateView(_)
@@ -263,6 +268,7 @@ impl LogicalPlan {
             LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
             LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
             LogicalPlan::Limit(Limit { input, .. }) => vec![input],
+            LogicalPlan::Offset(Offset { input, .. }) => vec![input],
             LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
             LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
             LogicalPlan::Extension(extension) => extension.node.inputs(),
@@ -403,6 +409,7 @@ impl LogicalPlan {
                 true
             }
             LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
+            LogicalPlan::Offset(Offset { input, .. }) => input.accept(visitor)?,
             LogicalPlan::Subquery(Subquery { subquery, .. }) => {
                 subquery.accept(visitor)?
             }
@@ -781,6 +788,9 @@ impl LogicalPlan {
                         }
                     },
                     LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
+                    LogicalPlan::Offset(Offset { ref offset, .. }) => {
+                        write!(f, "Offset: {}", offset)
+                    }
                     LogicalPlan::Subquery(Subquery { subquery, .. }) => {
                         write!(f, "Subquery: {:?}", subquery)
                     }
@@ -1131,6 +1141,15 @@ pub struct Limit {
     pub input: Arc<LogicalPlan>,
 }
 
+/// Adjusts the starting point at which the rest of the expressions begin to effect
+#[derive(Clone)]
+pub struct Offset {
+    /// The offset
+    pub offset: usize,
+    /// The logical plan
+    pub input: Arc<LogicalPlan>,
+}
+
 /// Aggregates its input based on a set of grouping and aggregate
 /// expressions (e.g. SUM).
 #[derive(Clone)]