You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/25 09:34:27 UTC
[arrow-datafusion] branch master updated: Add assertion for invariant in `create_physical_expression` and fix ViewTable projection (#3242)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 92110ddba Add assertion for invariant in `create_physical_expression` and fix ViewTable projection (#3242)
92110ddba is described below
commit 92110ddba004ca585649d1ba97dcb67f20fc0779
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Aug 25 03:34:21 2022 -0600
Add assertion for invariant in `create_physical_expression` and fix ViewTable projection (#3242)
---
datafusion/core/src/datasource/view.rs | 53 +++++++++++++++++++++++-
datafusion/optimizer/src/expr_simplifier.rs | 2 +-
datafusion/optimizer/src/simplify_expressions.rs | 16 ++++---
datafusion/physical-expr/src/planner.rs | 11 ++++-
4 files changed, 69 insertions(+), 13 deletions(-)
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index 1809f2a08..ef7e12aa2 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
+use datafusion_expr::LogicalPlanBuilder;
use crate::{
error::Result,
@@ -81,14 +82,36 @@ impl TableProvider for ViewTable {
async fn scan(
&self,
state: &SessionState,
- _projection: &Option<Vec<usize>>,
+ projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// clone state and start_execution so that now() works in views
let mut state_cloned = state.clone();
state_cloned.execution_props.start_execution();
- state_cloned.create_physical_plan(&self.logical_plan).await
+ if let Some(projection) = projection {
+ // avoiding adding a redundant projection (e.g. SELECT * FROM view)
+ let current_projection =
+ (0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
+ if projection == ¤t_projection {
+ state_cloned.create_physical_plan(&self.logical_plan).await
+ } else {
+ let fields: Vec<Expr> = projection
+ .iter()
+ .map(|i| {
+ Expr::Column(
+ self.logical_plan.schema().field(*i).qualified_column(),
+ )
+ })
+ .collect();
+ let plan = LogicalPlanBuilder::from(self.logical_plan.clone())
+ .project(fields)?
+ .build()?;
+ state_cloned.create_physical_plan(&plan).await
+ }
+ } else {
+ state_cloned.create_physical_plan(&self.logical_plan).await
+ }
}
}
@@ -99,6 +122,32 @@ mod tests {
use super::*;
+ #[tokio::test]
+ async fn issue_3242() -> Result<()> {
+ // regression test for https://github.com/apache/arrow-datafusion/pull/3242
+ let session_ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ session_ctx
+ .sql("create view v as select 1 as a, 2 as b, 3 as c")
+ .await?
+ .collect()
+ .await?;
+
+ let results = session_ctx
+ .sql("select * from (select b from v)")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec!["+---+", "| b |", "+---+", "| 2 |", "+---+"];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+ }
+
#[tokio::test]
async fn create_view_return_empty_dataframe() -> Result<()> {
let session_ctx = SessionContext::new();
diff --git a/datafusion/optimizer/src/expr_simplifier.rs b/datafusion/optimizer/src/expr_simplifier.rs
index d71ecdaa2..7cf5f02c6 100644
--- a/datafusion/optimizer/src/expr_simplifier.rs
+++ b/datafusion/optimizer/src/expr_simplifier.rs
@@ -88,7 +88,7 @@ impl ExprSimplifiable for Expr {
/// ```
fn simplify<S: SimplifyInfo>(self, info: &S) -> Result<Self> {
let mut rewriter = Simplifier::new(info);
- let mut const_evaluator = ConstEvaluator::new(info.execution_props());
+ let mut const_evaluator = ConstEvaluator::try_new(info.execution_props())?;
// TODO iterate until no changes are made during rewrite
// (evaluating constants can enable new simplifications and
diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs
index 34ec24fb6..384fd09ae 100644
--- a/datafusion/optimizer/src/simplify_expressions.rs
+++ b/datafusion/optimizer/src/simplify_expressions.rs
@@ -332,7 +332,7 @@ impl SimplifyExpressions {
/// # use datafusion_expr::expr_rewriter::ExprRewritable;
///
/// let execution_props = ExecutionProps::new();
-/// let mut const_evaluator = ConstEvaluator::new(&execution_props);
+/// let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap();
///
/// // (1 + 2) + a
/// let expr = (lit(1) + lit(2)) + col("a");
@@ -403,25 +403,23 @@ impl<'a> ConstEvaluator<'a> {
/// Create a new `ConstantEvaluator`. Session constants (such as
/// the time for `now()` are taken from the passed
/// `execution_props`.
- pub fn new(execution_props: &'a ExecutionProps) -> Self {
- let input_schema = DFSchema::empty();
-
+ pub fn try_new(execution_props: &'a ExecutionProps) -> Result<Self> {
// The dummy column name is unused and doesn't matter as only
// expressions without column references can be evaluated
static DUMMY_COL_NAME: &str = ".";
let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]);
+ let input_schema = DFSchema::try_from(schema.clone())?;
// Need a single "input" row to produce a single output row
let col = new_null_array(&DataType::Null, 1);
- let input_batch =
- RecordBatch::try_new(std::sync::Arc::new(schema), vec![col]).unwrap();
+ let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?;
- Self {
+ Ok(Self {
can_evaluate: vec![],
execution_props,
input_schema,
input_batch,
- }
+ })
}
/// Can a function of the specified volatility be evaluated?
@@ -1273,7 +1271,7 @@ mod tests {
var_providers: None,
};
- let mut const_evaluator = ConstEvaluator::new(&execution_props);
+ let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap();
let evaluated_expr = input_expr
.clone()
.rewrite(&mut const_evaluator)
diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs
index f84a5fbb9..010b61a9b 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -32,13 +32,22 @@ use datafusion_expr::binary_rule::comparison_coercion;
use datafusion_expr::{Expr, Operator};
use std::sync::Arc;
-/// Create a physical expression from a logical expression ([Expr])
+/// Create a physical expression from a logical expression ([Expr]).
+///
+/// # Arguments
+///
+/// * `e` - The logical expression
+/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
+/// to qualified or unqualified fields by name.
+/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
+/// when performing type coercion.
pub fn create_physical_expr(
e: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>> {
+ assert_eq!(input_schema.fields.len(), input_dfschema.fields().len());
match e {
Expr::Alias(expr, ..) => Ok(create_physical_expr(
expr,