You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/08 21:44:38 UTC

(arrow-datafusion) branch main updated: Add `evaluate_demo` and `range_analysis_demo` to Expr examples (#8377)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 91cc573d89 Add `evaluate_demo` and `range_analysis_demo` to Expr examples (#8377)
91cc573d89 is described below

commit 91cc573d89fbcf9342b968760a4d0f9a47072527
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Dec 8 16:44:32 2023 -0500

    Add `evaluate_demo` and `range_analysis_demo` to Expr examples (#8377)
    
    * Add `evaluate_demo` and `range_analysis_demo` to Expr examples
    
    * Prettier
    
    * Update datafusion-examples/examples/expr_api.rs
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    * rename ExprBoundaries::try_new_unknown --> ExprBoundaries::try_new_unbounded
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
 datafusion-examples/README.md                      |   2 +-
 datafusion-examples/examples/expr_api.rs           | 144 +++++++++++++++++++--
 datafusion/core/src/lib.rs                         |  10 +-
 datafusion/physical-expr/src/analysis.rs           |  25 +++-
 .../library-user-guide/working-with-exprs.md       |  13 +-
 5 files changed, 174 insertions(+), 20 deletions(-)

diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 9f7c9f99d1..305422ccd0 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -50,7 +50,7 @@ cargo run --example csv_sql
 - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3
 - [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
 - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
-- [`expr_api.rs`](examples/expr_api.rs): Use the `Expr` construction and simplification API
+- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and anaylze `Expr`s
 - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
 - [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
 - [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs
index 97abf4d552..715e1ff2dc 100644
--- a/datafusion-examples/examples/expr_api.rs
+++ b/datafusion-examples/examples/expr_api.rs
@@ -15,28 +15,43 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::array::{BooleanArray, Int32Array};
+use arrow::record_batch::RecordBatch;
 use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
 use datafusion::error::Result;
 use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
 use datafusion::physical_expr::execution_props::ExecutionProps;
+use datafusion::physical_expr::{
+    analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
+};
 use datafusion::prelude::*;
 use datafusion_common::{ScalarValue, ToDFSchema};
 use datafusion_expr::expr::BinaryExpr;
-use datafusion_expr::Operator;
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
+use std::sync::Arc;
 
 /// This example demonstrates the DataFusion [`Expr`] API.
 ///
 /// DataFusion comes with a powerful and extensive system for
 /// representing and manipulating expressions such as `A + 5` and `X
-/// IN ('foo', 'bar', 'baz')` and many other constructs.
+/// IN ('foo', 'bar', 'baz')`.
+///
+/// In addition to building and manipulating [`Expr`]s, DataFusion
+/// also comes with APIs for evaluation, simplification, and analysis.
+///
+/// The code in this example shows how to:
+/// 1. Create [`Exprs`] using different APIs: [`main`]`
+/// 2. Evaluate [`Exprs`] against data: [`evaluate_demo`]
+/// 3. Simplify expressions: [`simplify_demo`]
+/// 4. Analyze predicates for boundary ranges: [`range_analysis_demo`]
 #[tokio::main]
 async fn main() -> Result<()> {
     // The easiest way to do create expressions is to use the
-    // "fluent"-style API, like this:
+    // "fluent"-style API:
     let expr = col("a") + lit(5);
 
-    // this creates the same expression as the following though with
-    // much less code,
+    // The same same expression can be created directly, with much more code:
     let expr2 = Expr::BinaryExpr(BinaryExpr::new(
         Box::new(col("a")),
         Operator::Plus,
@@ -44,15 +59,51 @@ async fn main() -> Result<()> {
     ));
     assert_eq!(expr, expr2);
 
+    // See how to evaluate expressions
+    evaluate_demo()?;
+
+    // See how to simplify expressions
     simplify_demo()?;
 
+    // See how to analyze ranges in expressions
+    range_analysis_demo()?;
+
+    Ok(())
+}
+
+/// DataFusion can also evaluate arbitrary expressions on Arrow arrays.
+fn evaluate_demo() -> Result<()> {
+    // For example, let's say you have some integers in an array
+    let batch = RecordBatch::try_from_iter([(
+        "a",
+        Arc::new(Int32Array::from(vec![4, 5, 6, 7, 8, 7, 4])) as _,
+    )])?;
+
+    // If you want to find all rows where the expression `a < 5 OR a = 8` is true
+    let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
+
+    // First, you make a "physical expression" from the logical `Expr`
+    let physical_expr = physical_expr(&batch.schema(), expr)?;
+
+    // Now, you can evaluate the expression against the RecordBatch
+    let result = physical_expr.evaluate(&batch)?;
+
+    // The result contain an array that is true only for where `a < 5 OR a = 8`
+    let expected_result = Arc::new(BooleanArray::from(vec![
+        true, false, false, false, true, false, true,
+    ])) as _;
+    assert!(
+        matches!(&result, ColumnarValue::Array(r) if r == &expected_result),
+        "result: {:?}",
+        result
+    );
+
     Ok(())
 }
 
-/// In addition to easy construction, DataFusion exposes APIs for
-/// working with and simplifying such expressions that call into the
-/// same powerful and extensive implementation used for the query
-/// engine.
+/// In addition to easy construction, DataFusion exposes APIs for simplifying
+/// such expression so they are more efficient to evaluate. This code is also
+/// used by the query engine to optimize queries.
 fn simplify_demo() -> Result<()> {
     // For example, lets say you have has created an expression such
     // ts = to_timestamp("2020-09-08T12:00:00+00:00")
@@ -94,7 +145,7 @@ fn simplify_demo() -> Result<()> {
         make_field("b", DataType::Boolean),
     ])
     .to_dfschema_ref()?;
-    let context = SimplifyContext::new(&props).with_schema(schema);
+    let context = SimplifyContext::new(&props).with_schema(schema.clone());
     let simplifier = ExprSimplifier::new(context);
 
     // basic arithmetic simplification
@@ -120,6 +171,64 @@ fn simplify_demo() -> Result<()> {
         col("i").lt(lit(10))
     );
 
+    // String --> Date simplification
+    // `cast('2020-09-01' as date)` --> 18500
+    assert_eq!(
+        simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
+        lit(ScalarValue::Date32(Some(18506)))
+    );
+
+    Ok(())
+}
+
+/// DataFusion also has APIs for analyzing predicates (boolean expressions) to
+/// determine any ranges restrictions on the inputs required for the predicate
+/// evaluate to true.
+fn range_analysis_demo() -> Result<()> {
+    // For example, let's say you are interested in finding data for all days
+    // in the month of September, 2020
+    let september_1 = ScalarValue::Date32(Some(18506)); // 2020-09-01
+    let october_1 = ScalarValue::Date32(Some(18536)); // 2020-10-01
+
+    //  The predicate to find all such days could be
+    // `date > '2020-09-01' AND date < '2020-10-01'`
+    let expr = col("date")
+        .gt(lit(september_1.clone()))
+        .and(col("date").lt(lit(october_1.clone())));
+
+    // Using the analysis API, DataFusion can determine that the value of `date`
+    // must be in the range `['2020-09-01', '2020-10-01']`. If your data is
+    // organized in files according to day, this information permits skipping
+    // entire files without reading them.
+    //
+    // While this simple example could be handled with a special case, the
+    // DataFusion API handles arbitrary expressions (so for example, you don't
+    // have to handle the case where the predicate clauses are reversed such as
+    // `date < '2020-10-01' AND date > '2020-09-01'`
+
+    // As always, we need to tell DataFusion the type of column "date"
+    let schema = Schema::new(vec![make_field("date", DataType::Date32)]);
+
+    // You can provide DataFusion any known boundaries on the values of `date`
+    // (for example, maybe you know you only have data up to `2020-09-15`), but
+    // in this case, let's say we don't know any boundaries beforehand so we use
+    // `try_new_unknown`
+    let boundaries = ExprBoundaries::try_new_unbounded(&schema)?;
+
+    // Now, we invoke the analysis code to perform the range analysis
+    let physical_expr = physical_expr(&schema, expr)?;
+    let analysis_result =
+        analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;
+
+    // The results of the analysis is an range, encoded as an `Interval`,  for
+    // each column in the schema, that must be true in order for the predicate
+    // to be true.
+    //
+    // In this case, we can see that, as expected, `analyze` has figured out
+    // that in this case,  `date` must be in the range `['2020-09-01', '2020-10-01']`
+    let expected_range = Interval::try_new(september_1, october_1)?;
+    assert_eq!(analysis_result.boundaries[0].interval, expected_range);
+
     Ok(())
 }
 
@@ -132,3 +241,18 @@ fn make_ts_field(name: &str) -> Field {
     let tz = None;
     make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
 }
+
+/// Build a physical expression from a logical one, after applying simplification and type coercion
+pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
+    let df_schema = schema.clone().to_dfschema_ref()?;
+
+    // Simplify
+    let props = ExecutionProps::new();
+    let simplifier =
+        ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));
+
+    // apply type coercion here to ensure types match
+    let expr = simplifier.coerce(expr, df_schema.clone())?;
+
+    create_physical_expr(&expr, df_schema.as_ref(), schema, &props)
+}
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index bf9a4abf4f..b3ebbc6e36 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -283,12 +283,20 @@
 //!
 //! ## Plan Representations
 //!
-//! Logical planning yields [`LogicalPlan`]s nodes and [`Expr`]
+//! ### Logical Plans
+//! Logical planning yields [`LogicalPlan`] nodes and [`Expr`]
 //! expressions which are [`Schema`] aware and represent statements
 //! independent of how they are physically executed.
 //! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
 //! [`LogicalPlan`]s, each potentially containing embedded [`Expr`]s.
 //!
+//! Examples of working with and executing `Expr`s can be found in the
+//! [`expr_api`.rs] example
+//!
+//! [`expr_api`.rs]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs
+//!
+//! ### Physical Plans
+//!
 //! An [`ExecutionPlan`] (sometimes referred to as a "physical plan")
 //! is a plan that can be executed against data. It a DAG of other
 //! [`ExecutionPlan`]s each potentially containing expressions of the
diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs
index dc12bdf46a..f43434362a 100644
--- a/datafusion/physical-expr/src/analysis.rs
+++ b/datafusion/physical-expr/src/analysis.rs
@@ -72,8 +72,12 @@ impl AnalysisContext {
     }
 }
 
-/// Represents the boundaries of the resulting value from a physical expression,
-/// if it were to be an expression, if it were to be evaluated.
+/// Represents the boundaries (e.g. min and max values) of a particular column
+///
+/// This is used range analysis of expressions, to determine if the expression
+/// limits the value of particular columns (e.g. analyzing an expression such as
+/// `time < 50` would result in a boundary interval for `time` having a max
+/// value of `50`).
 #[derive(Clone, Debug, PartialEq)]
 pub struct ExprBoundaries {
     pub column: Column,
@@ -111,6 +115,23 @@ impl ExprBoundaries {
             distinct_count: col_stats.distinct_count.clone(),
         })
     }
+
+    /// Create `ExprBoundaries` that represent no known bounds for all the
+    /// columns in `schema`
+    pub fn try_new_unbounded(schema: &Schema) -> Result<Vec<Self>> {
+        schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(i, field)| {
+                Ok(Self {
+                    column: Column::new(field.name(), i),
+                    interval: Interval::make_unbounded(field.data_type())?,
+                    distinct_count: Precision::Absent,
+                })
+            })
+            .collect()
+    }
 }
 
 /// Attempts to refine column boundaries and compute a selectivity value.
diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md
index a8baf24d5f..96be8ef7f1 100644
--- a/docs/source/library-user-guide/working-with-exprs.md
+++ b/docs/source/library-user-guide/working-with-exprs.md
@@ -17,7 +17,7 @@
   under the License.
 -->
 
-# Working with Exprs
+# Working with `Expr`s
 
 <!-- https://github.com/apache/arrow-datafusion/issues/7304 -->
 
@@ -48,12 +48,11 @@ As another example, the SQL expression `a + b * c` would be represented as an `E
                     └────────────────────┘  └────────────────────┘
 ```
 
-As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.
+As the writer of a library, you can use `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.
 
-There are also executable examples for working with `Expr`s:
+## Creating and Evaluating `Expr`s
 
-- [rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs)
-- [expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs)
+Please see [expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs) for well commented code for creating, evaluating, simplifying, and analyzing `Expr`s.
 
 ## A Scalar UDF Example
 
@@ -79,7 +78,9 @@ let expr = add_one_udf.call(vec![col("my_column")]);
 
 If you'd like to learn more about `Expr`s, before we get into the details of creating and rewriting them, you can read the [expression user-guide](./../user-guide/expressions.md).
 
-## Rewriting Exprs
+## Rewriting `Expr`s
+
+[rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs) contains example code for rewriting `Expr`s.
 
 Rewriting Expressions is the process of taking an `Expr` and transforming it into another `Expr`. This is useful for a number of reasons, including: