You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/25 09:34:27 UTC

[arrow-datafusion] branch master updated: Add assertion for invariant in `create_physical_expression` and fix ViewTable projection (#3242)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 92110ddba Add assertion for invariant in `create_physical_expression` and fix ViewTable projection (#3242)
92110ddba is described below

commit 92110ddba004ca585649d1ba97dcb67f20fc0779
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Aug 25 03:34:21 2022 -0600

    Add assertion for invariant in `create_physical_expression` and fix ViewTable projection (#3242)
---
 datafusion/core/src/datasource/view.rs           | 53 +++++++++++++++++++++++-
 datafusion/optimizer/src/expr_simplifier.rs      |  2 +-
 datafusion/optimizer/src/simplify_expressions.rs | 16 ++++---
 datafusion/physical-expr/src/planner.rs          | 11 ++++-
 4 files changed, 69 insertions(+), 13 deletions(-)

diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index 1809f2a08..ef7e12aa2 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc};
 
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
+use datafusion_expr::LogicalPlanBuilder;
 
 use crate::{
     error::Result,
@@ -81,14 +82,36 @@ impl TableProvider for ViewTable {
     async fn scan(
         &self,
         state: &SessionState,
-        _projection: &Option<Vec<usize>>,
+        projection: &Option<Vec<usize>>,
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // clone state and start_execution so that now() works in views
         let mut state_cloned = state.clone();
         state_cloned.execution_props.start_execution();
-        state_cloned.create_physical_plan(&self.logical_plan).await
+        if let Some(projection) = projection {
+            // avoiding adding a redundant projection (e.g. SELECT * FROM view)
+            let current_projection =
+                (0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
+            if projection == &current_projection {
+                state_cloned.create_physical_plan(&self.logical_plan).await
+            } else {
+                let fields: Vec<Expr> = projection
+                    .iter()
+                    .map(|i| {
+                        Expr::Column(
+                            self.logical_plan.schema().field(*i).qualified_column(),
+                        )
+                    })
+                    .collect();
+                let plan = LogicalPlanBuilder::from(self.logical_plan.clone())
+                    .project(fields)?
+                    .build()?;
+                state_cloned.create_physical_plan(&plan).await
+            }
+        } else {
+            state_cloned.create_physical_plan(&self.logical_plan).await
+        }
     }
 }
 
@@ -99,6 +122,32 @@ mod tests {
 
     use super::*;
 
+    #[tokio::test]
+    async fn issue_3242() -> Result<()> {
+        // regression test for https://github.com/apache/arrow-datafusion/pull/3242
+        let session_ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        session_ctx
+            .sql("create view v as select 1 as a, 2 as b, 3 as c")
+            .await?
+            .collect()
+            .await?;
+
+        let results = session_ctx
+            .sql("select * from (select b from v)")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec!["+---+", "| b |", "+---+", "| 2 |", "+---+"];
+
+        assert_batches_eq!(expected, &results);
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn create_view_return_empty_dataframe() -> Result<()> {
         let session_ctx = SessionContext::new();
diff --git a/datafusion/optimizer/src/expr_simplifier.rs b/datafusion/optimizer/src/expr_simplifier.rs
index d71ecdaa2..7cf5f02c6 100644
--- a/datafusion/optimizer/src/expr_simplifier.rs
+++ b/datafusion/optimizer/src/expr_simplifier.rs
@@ -88,7 +88,7 @@ impl ExprSimplifiable for Expr {
     /// ```
     fn simplify<S: SimplifyInfo>(self, info: &S) -> Result<Self> {
         let mut rewriter = Simplifier::new(info);
-        let mut const_evaluator = ConstEvaluator::new(info.execution_props());
+        let mut const_evaluator = ConstEvaluator::try_new(info.execution_props())?;
 
         // TODO iterate until no changes are made during rewrite
         // (evaluating constants can enable new simplifications and
diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs
index 34ec24fb6..384fd09ae 100644
--- a/datafusion/optimizer/src/simplify_expressions.rs
+++ b/datafusion/optimizer/src/simplify_expressions.rs
@@ -332,7 +332,7 @@ impl SimplifyExpressions {
 /// # use datafusion_expr::expr_rewriter::ExprRewritable;
 ///
 /// let execution_props = ExecutionProps::new();
-/// let mut const_evaluator = ConstEvaluator::new(&execution_props);
+/// let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap();
 ///
 /// // (1 + 2) + a
 /// let expr = (lit(1) + lit(2)) + col("a");
@@ -403,25 +403,23 @@ impl<'a> ConstEvaluator<'a> {
     /// Create a new `ConstantEvaluator`. Session constants (such as
     /// the time for `now()` are taken from the passed
     /// `execution_props`.
-    pub fn new(execution_props: &'a ExecutionProps) -> Self {
-        let input_schema = DFSchema::empty();
-
+    pub fn try_new(execution_props: &'a ExecutionProps) -> Result<Self> {
         // The dummy column name is unused and doesn't matter as only
         // expressions without column references can be evaluated
         static DUMMY_COL_NAME: &str = ".";
         let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]);
+        let input_schema = DFSchema::try_from(schema.clone())?;
 
         // Need a single "input" row to produce a single output row
         let col = new_null_array(&DataType::Null, 1);
-        let input_batch =
-            RecordBatch::try_new(std::sync::Arc::new(schema), vec![col]).unwrap();
+        let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?;
 
-        Self {
+        Ok(Self {
             can_evaluate: vec![],
             execution_props,
             input_schema,
             input_batch,
-        }
+        })
     }
 
     /// Can a function of the specified volatility be evaluated?
@@ -1273,7 +1271,7 @@ mod tests {
             var_providers: None,
         };
 
-        let mut const_evaluator = ConstEvaluator::new(&execution_props);
+        let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap();
         let evaluated_expr = input_expr
             .clone()
             .rewrite(&mut const_evaluator)
diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs
index f84a5fbb9..010b61a9b 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -32,13 +32,22 @@ use datafusion_expr::binary_rule::comparison_coercion;
 use datafusion_expr::{Expr, Operator};
 use std::sync::Arc;
 
-/// Create a physical expression from a logical expression ([Expr])
+/// Create a physical expression from a logical expression ([Expr]).
+///
+/// # Arguments
+///
+/// * `e` - The logical expression
+/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
+///                      to qualified or unqualified fields by name.
+/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
+///                    when performing type coercion.
 pub fn create_physical_expr(
     e: &Expr,
     input_dfschema: &DFSchema,
     input_schema: &Schema,
     execution_props: &ExecutionProps,
 ) -> Result<Arc<dyn PhysicalExpr>> {
+    assert_eq!(input_schema.fields.len(), input_dfschema.fields().len());
     match e {
         Expr::Alias(expr, ..) => Ok(create_physical_expr(
             expr,