You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/17 18:36:59 UTC
[arrow-datafusion] branch master updated: Support for OFFSET in LogicalPlan (#2521)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b1e3a521b Support for OFFSET in LogicalPlan (#2521)
b1e3a521b is described below
commit b1e3a521b7e0ad49c5430bc07bf3026aa2cbb231
Author: Jeremy Dyer <jd...@gmail.com>
AuthorDate: Tue May 17 14:36:54 2022 -0400
Support for OFFSET in LogicalPlan (#2521)
* Introduce support for OFFSET
* lint fixes
* Slightly modify existing test to include LIMIT and OFFSET
* Uncomment accidental comment out for pre-commit script
* OFFSET should come before LIMIT
* Check for OFFSET <= 0 and add more tests
---
ballista/rust/core/proto/ballista.proto | 6 ++
ballista/rust/core/src/serde/logical_plan/mod.rs | 25 +++++++-
datafusion/core/src/logical_plan/builder.rs | 20 +++++--
datafusion/core/src/logical_plan/mod.rs | 2 +-
datafusion/core/src/logical_plan/plan.rs | 2 +-
.../core/src/optimizer/common_subexpr_eliminate.rs | 1 +
.../core/src/optimizer/projection_push_down.rs | 1 +
datafusion/core/src/optimizer/utils.rs | 7 ++-
datafusion/core/src/physical_plan/planner.rs | 5 ++
datafusion/core/src/sql/planner.rs | 69 ++++++++++++++++++++--
datafusion/expr/src/logical_plan/mod.rs | 5 +-
datafusion/expr/src/logical_plan/plan.rs | 19 ++++++
12 files changed, 148 insertions(+), 14 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index e7821dc10..1ceb412f1 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -54,6 +54,7 @@ message LogicalPlanNode {
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
+ OffsetNode offset = 23;
}
}
@@ -251,6 +252,11 @@ message LimitNode {
uint32 limit = 2;
}
+message OffsetNode {
+ LogicalPlanNode input = 1;
+ uint32 offset = 2;
+}
+
message SelectionExecNode {
datafusion.LogicalExprNode expr = 1;
}
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 1dd61a1e9..070dad98a 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -34,7 +34,7 @@ use datafusion::logical_plan::plan::{
use datafusion::logical_plan::{
source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder,
- Repartition, TableScan, Values,
+ Offset, Repartition, TableScan, Values,
};
use datafusion::prelude::SessionContext;
@@ -405,6 +405,14 @@ impl AsLogicalPlan for LogicalPlanNode {
.build()
.map_err(|e| e.into())
}
+ LogicalPlanType::Offset(offset) => {
+ let input: LogicalPlan =
+ into_logical_plan!(offset.input, ctx, extension_codec)?;
+ LogicalPlanBuilder::from(input)
+ .offset(offset.offset as usize)?
+ .build()
+ .map_err(|e| e.into())
+ }
LogicalPlanType::Join(join) => {
let left_keys: Vec<Column> =
join.left_join_column.iter().map(|i| i.into()).collect();
@@ -759,6 +767,21 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
+ LogicalPlan::Offset(Offset { input, offset }) => {
+ let input: protobuf::LogicalPlanNode =
+ protobuf::LogicalPlanNode::try_from_logical_plan(
+ input.as_ref(),
+ extension_codec,
+ )?;
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::Offset(Box::new(
+ protobuf::OffsetNode {
+ input: Some(Box::new(input)),
+ offset: *offset as u32,
+ },
+ ))),
+ })
+ }
LogicalPlan::Sort(Sort { input, expr }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index 246eeb756..9e8f17815 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -42,7 +42,7 @@ use crate::logical_plan::expr::exprlist_to_fields;
use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, provider_as_source,
rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit,
- Partitioning, Repartition, Values,
+ Offset, Partitioning, Repartition, Values,
};
use crate::sql::utils::group_window_expr_by_sort_keys;
use datafusion_common::ToDFSchema;
@@ -312,6 +312,14 @@ impl LogicalPlanBuilder {
})))
}
+ /// Apply an offset
+ pub fn offset(&self, offset: usize) -> Result<Self> {
+ Ok(Self::from(LogicalPlan::Offset(Offset {
+ offset,
+ input: Arc::new(self.plan.clone()),
+ })))
+ }
+
/// Apply an alias
pub fn alias(&self, alias: &str) -> Result<Self> {
let schema: Schema = self.schema().as_ref().clone().into();
@@ -991,11 +999,15 @@ mod tests {
vec![sum(col("salary")).alias("total_salary")],
)?
.project(vec![col("state"), col("total_salary")])?
+ .limit(10)?
+ .offset(2)?
.build()?;
- let expected = "Projection: #employee_csv.state, #total_salary\
- \n Aggregate: groupBy=[[#employee_csv.state]], aggr=[[SUM(#employee_csv.salary) AS total_salary]]\
- \n TableScan: employee_csv projection=Some([3, 4])";
+ let expected = "Offset: 2\
+ \n Limit: 10\
+ \n Projection: #employee_csv.state, #total_salary\
+ \n Aggregate: groupBy=[[#employee_csv.state]], aggr=[[SUM(#employee_csv.salary) AS total_salary]]\
+ \n TableScan: employee_csv projection=Some([3, 4])";
assert_eq!(expected, format!("{:?}", plan));
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index d4397c4b0..e9496c8f2 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -61,7 +61,7 @@ pub use plan::{provider_as_source, source_as_provider};
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType,
- Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
+ Limit, LogicalPlan, Offset, Partitioning, PlanType, PlanVisitor, Repartition,
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
};
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index e1adb4939..f12981707 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -27,7 +27,7 @@ pub use crate::logical_expr::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain,
Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
- Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
+ Offset, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
},
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index defb42289..af0eea663 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -220,6 +220,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
+ | LogicalPlan::Offset(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 6d5e80b1e..a1bd6efe1 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -469,6 +469,7 @@ fn optimize_plan(
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
LogicalPlan::Limit(_)
+ | LogicalPlan::Offset(_)
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition(_)
| LogicalPlan::EmptyRelation(_)
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index 1a1f9edce..96ba969c4 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -27,7 +27,8 @@ use datafusion_expr::logical_plan::{
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
and, build_join_schema, CreateMemoryTable, CreateView, DFSchemaRef, Expr, Limit,
- LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Repartition, Union, Values,
+ LogicalPlan, LogicalPlanBuilder, Offset, Operator, Partitioning, Repartition, Union,
+ Values,
};
use crate::prelude::lit;
use crate::scalar::ScalarValue;
@@ -184,6 +185,10 @@ pub fn from_plan(
n: *n,
input: Arc::new(inputs[0].clone()),
})),
+ LogicalPlan::Offset(Offset { offset, .. }) => Ok(LogicalPlan::Offset(Offset {
+ offset: *offset,
+ input: Arc::new(inputs[0].clone()),
+ })),
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name,
if_not_exists,
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 77aee4102..af844cd1e 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -853,6 +853,11 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(GlobalLimitExec::new(input, limit)))
}
+ LogicalPlan::Offset(_) => {
+ Err(DataFusionError::Internal(
+ "Unsupported logical plan: OFFSET".to_string(),
+ ))
+ }
LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index cb02c3ed0..2518de78a 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -54,9 +54,10 @@ use datafusion_expr::expr::GroupingSet;
use datafusion_expr::logical_plan::{Filter, Subquery};
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
- FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
- Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor,
- TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
+ FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName,
+ Offset as SQLOffset, Query, Select, SelectItem, SetExpr, SetOperator,
+ ShowStatementFilter, TableFactor, TableWithJoins, TrimWhereField, UnaryOperator,
+ Value, Values as SQLValues,
};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
@@ -293,6 +294,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.order_by(plan, query.order_by)?;
+ let plan: LogicalPlan = self.offset(plan, query.offset)?;
+
self.limit(plan, query.limit)
}
@@ -1259,6 +1262,39 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
+ /// Wrap a plan in a offset
+ fn offset(
+ &self,
+ input: LogicalPlan,
+ offset: Option<SQLOffset>,
+ ) -> Result<LogicalPlan> {
+ match offset {
+ Some(offset_expr) => {
+ let offset = match self.sql_to_rex(
+ offset_expr.value,
+ input.schema(),
+ &mut HashMap::new(),
+ )? {
+ Expr::Literal(ScalarValue::Int64(Some(offset))) => {
+ if offset < 0 {
+ return Err(DataFusionError::Plan(format!(
+ "Offset must be >= 0, '{}' was provided.",
+ offset
+ )));
+ }
+ Ok(offset as usize)
+ }
+ _ => Err(DataFusionError::Plan(
+ "Unexpected expression in OFFSET clause".to_string(),
+ )),
+ }?;
+
+ LogicalPlanBuilder::from(input).offset(offset)?.build()
+ }
+ _ => Ok(input),
+ }
+ }
+
/// Wrap the logical in a sort
fn order_by(
&self,
@@ -4327,7 +4363,7 @@ mod tests {
fn logical_plan(sql: &str) -> Result<LogicalPlan> {
let planner = SqlToRel::new(&MockContextProvider {});
let result = DFParser::parse_sql(sql);
- let mut ast = result.unwrap();
+ let mut ast = result?;
planner.statement_to_plan(ast.pop_front().unwrap())
}
@@ -4784,6 +4820,31 @@ mod tests {
quick_test(sql, expected);
}
+ #[test]
+ fn test_offset_with_limit() {
+ let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
+ let expected = "Limit: 5\
+ \n Offset: 0\
+ \n Projection: #person.id\
+ \n Filter: #person.id > Int64(100)\
+ \n TableScan: person projection=None";
+ quick_test(sql, expected);
+
+ // Flip the order of LIMIT and OFFSET in the query. Plan should remain the same.
+ let sql = "SELECT id FROM person WHERE person.id > 100 OFFSET 0 LIMIT 5;";
+ quick_test(sql, expected);
+ }
+
+ #[test]
+ fn test_offset_no_limit() {
+ let sql = "SELECT id FROM person WHERE person.id > 100 OFFSET 5;";
+ let expected = "Offset: 5\
+ \n Projection: #person.id\
+ \n Filter: #person.id > Int64(100)\
+ \n TableScan: person projection=None";
+ quick_test(sql, expected);
+ }
+
fn assert_field_not_found(err: DataFusionError, name: &str) {
match err {
DataFusionError::SchemaError { .. } => {
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 3681ff14e..e9a90cd7f 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -23,8 +23,9 @@ pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain,
Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
- Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, StringifiedPlan,
- Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, Window,
+ Offset, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
+ StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
+ Values, Window,
};
pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index ab96dbe73..50ca75fdf 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -73,6 +73,8 @@ pub enum LogicalPlan {
SubqueryAlias(SubqueryAlias),
/// Produces the first `n` tuples from its input and discards the rest.
Limit(Limit),
+ /// Adjusts the starting point at which the rest of the expressions begin to effect
+ Offset(Offset),
/// Creates an external table.
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
@@ -117,6 +119,7 @@ impl LogicalPlan {
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+ LogicalPlan::Offset(Offset { input, .. }) => input.schema(),
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
@@ -187,6 +190,7 @@ impl LogicalPlan {
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::CreateView(CreateView { input, .. })
+ | LogicalPlan::Offset(Offset { input, .. })
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_) => vec![],
}
@@ -235,6 +239,7 @@ impl LogicalPlan {
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
+ | LogicalPlan::Offset(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateView(_)
@@ -263,6 +268,7 @@ impl LogicalPlan {
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
+ LogicalPlan::Offset(Offset { input, .. }) => vec![input],
LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
@@ -403,6 +409,7 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Offset(Offset { input, .. }) => input.accept(visitor)?,
LogicalPlan::Subquery(Subquery { subquery, .. }) => {
subquery.accept(visitor)?
}
@@ -781,6 +788,9 @@ impl LogicalPlan {
}
},
LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
+ LogicalPlan::Offset(Offset { ref offset, .. }) => {
+ write!(f, "Offset: {}", offset)
+ }
LogicalPlan::Subquery(Subquery { subquery, .. }) => {
write!(f, "Subquery: {:?}", subquery)
}
@@ -1131,6 +1141,15 @@ pub struct Limit {
pub input: Arc<LogicalPlan>,
}
+/// Adjusts the starting point at which the rest of the expressions begin to effect
+#[derive(Clone)]
+pub struct Offset {
+ /// The offset
+ pub offset: usize,
+ /// The logical plan
+ pub input: Arc<LogicalPlan>,
+}
+
/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
#[derive(Clone)]