You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/08 21:44:38 UTC
(arrow-datafusion) branch main updated: Add `evaluate_demo` and `range_analysis_demo` to Expr examples (#8377)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 91cc573d89 Add `evaluate_demo` and `range_analysis_demo` to Expr examples (#8377)
91cc573d89 is described below
commit 91cc573d89fbcf9342b968760a4d0f9a47072527
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Dec 8 16:44:32 2023 -0500
Add `evaluate_demo` and `range_analysis_demo` to Expr examples (#8377)
* Add `evaluate_demo` and `range_analysis_demo` to Expr examples
* Prettier
* Update datafusion-examples/examples/expr_api.rs
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
* rename ExprBoundaries::try_new_unknown --> ExprBoundaries::try_new_unbounded
---------
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
datafusion-examples/README.md | 2 +-
datafusion-examples/examples/expr_api.rs | 144 +++++++++++++++++++--
datafusion/core/src/lib.rs | 10 +-
datafusion/physical-expr/src/analysis.rs | 25 +++-
.../library-user-guide/working-with-exprs.md | 13 +-
5 files changed, 174 insertions(+), 20 deletions(-)
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 9f7c9f99d1..305422ccd0 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -50,7 +50,7 @@ cargo run --example csv_sql
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
-- [`expr_api.rs`](examples/expr_api.rs): Use the `Expr` construction and simplification API
+- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and anaylze `Expr`s
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs
index 97abf4d552..715e1ff2dc 100644
--- a/datafusion-examples/examples/expr_api.rs
+++ b/datafusion-examples/examples/expr_api.rs
@@ -15,28 +15,43 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::{BooleanArray, Int32Array};
+use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
use datafusion::physical_expr::execution_props::ExecutionProps;
+use datafusion::physical_expr::{
+ analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
+};
use datafusion::prelude::*;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::expr::BinaryExpr;
-use datafusion_expr::Operator;
+use datafusion_expr::interval_arithmetic::Interval;
+use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
+use std::sync::Arc;
/// This example demonstrates the DataFusion [`Expr`] API.
///
/// DataFusion comes with a powerful and extensive system for
/// representing and manipulating expressions such as `A + 5` and `X
-/// IN ('foo', 'bar', 'baz')` and many other constructs.
+/// IN ('foo', 'bar', 'baz')`.
+///
+/// In addition to building and manipulating [`Expr`]s, DataFusion
+/// also comes with APIs for evaluation, simplification, and analysis.
+///
+/// The code in this example shows how to:
+/// 1. Create [`Exprs`] using different APIs: [`main`]`
+/// 2. Evaluate [`Exprs`] against data: [`evaluate_demo`]
+/// 3. Simplify expressions: [`simplify_demo`]
+/// 4. Analyze predicates for boundary ranges: [`range_analysis_demo`]
#[tokio::main]
async fn main() -> Result<()> {
// The easiest way to do create expressions is to use the
- // "fluent"-style API, like this:
+ // "fluent"-style API:
let expr = col("a") + lit(5);
- // this creates the same expression as the following though with
- // much less code,
+ // The same same expression can be created directly, with much more code:
let expr2 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col("a")),
Operator::Plus,
@@ -44,15 +59,51 @@ async fn main() -> Result<()> {
));
assert_eq!(expr, expr2);
+ // See how to evaluate expressions
+ evaluate_demo()?;
+
+ // See how to simplify expressions
simplify_demo()?;
+ // See how to analyze ranges in expressions
+ range_analysis_demo()?;
+
+ Ok(())
+}
+
+/// DataFusion can also evaluate arbitrary expressions on Arrow arrays.
+fn evaluate_demo() -> Result<()> {
+ // For example, let's say you have some integers in an array
+ let batch = RecordBatch::try_from_iter([(
+ "a",
+ Arc::new(Int32Array::from(vec![4, 5, 6, 7, 8, 7, 4])) as _,
+ )])?;
+
+ // If you want to find all rows where the expression `a < 5 OR a = 8` is true
+ let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
+
+ // First, you make a "physical expression" from the logical `Expr`
+ let physical_expr = physical_expr(&batch.schema(), expr)?;
+
+ // Now, you can evaluate the expression against the RecordBatch
+ let result = physical_expr.evaluate(&batch)?;
+
+ // The result contain an array that is true only for where `a < 5 OR a = 8`
+ let expected_result = Arc::new(BooleanArray::from(vec![
+ true, false, false, false, true, false, true,
+ ])) as _;
+ assert!(
+ matches!(&result, ColumnarValue::Array(r) if r == &expected_result),
+ "result: {:?}",
+ result
+ );
+
Ok(())
}
-/// In addition to easy construction, DataFusion exposes APIs for
-/// working with and simplifying such expressions that call into the
-/// same powerful and extensive implementation used for the query
-/// engine.
+/// In addition to easy construction, DataFusion exposes APIs for simplifying
+/// such expression so they are more efficient to evaluate. This code is also
+/// used by the query engine to optimize queries.
fn simplify_demo() -> Result<()> {
// For example, lets say you have has created an expression such
// ts = to_timestamp("2020-09-08T12:00:00+00:00")
@@ -94,7 +145,7 @@ fn simplify_demo() -> Result<()> {
make_field("b", DataType::Boolean),
])
.to_dfschema_ref()?;
- let context = SimplifyContext::new(&props).with_schema(schema);
+ let context = SimplifyContext::new(&props).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);
// basic arithmetic simplification
@@ -120,6 +171,64 @@ fn simplify_demo() -> Result<()> {
col("i").lt(lit(10))
);
+ // String --> Date simplification
+ // `cast('2020-09-01' as date)` --> 18500
+ assert_eq!(
+ simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
+ lit(ScalarValue::Date32(Some(18506)))
+ );
+
+ Ok(())
+}
+
+/// DataFusion also has APIs for analyzing predicates (boolean expressions) to
+/// determine any ranges restrictions on the inputs required for the predicate
+/// evaluate to true.
+fn range_analysis_demo() -> Result<()> {
+ // For example, let's say you are interested in finding data for all days
+ // in the month of September, 2020
+ let september_1 = ScalarValue::Date32(Some(18506)); // 2020-09-01
+ let october_1 = ScalarValue::Date32(Some(18536)); // 2020-10-01
+
+ // The predicate to find all such days could be
+ // `date > '2020-09-01' AND date < '2020-10-01'`
+ let expr = col("date")
+ .gt(lit(september_1.clone()))
+ .and(col("date").lt(lit(october_1.clone())));
+
+ // Using the analysis API, DataFusion can determine that the value of `date`
+ // must be in the range `['2020-09-01', '2020-10-01']`. If your data is
+ // organized in files according to day, this information permits skipping
+ // entire files without reading them.
+ //
+ // While this simple example could be handled with a special case, the
+ // DataFusion API handles arbitrary expressions (so for example, you don't
+ // have to handle the case where the predicate clauses are reversed such as
+ // `date < '2020-10-01' AND date > '2020-09-01'`
+
+ // As always, we need to tell DataFusion the type of column "date"
+ let schema = Schema::new(vec![make_field("date", DataType::Date32)]);
+
+ // You can provide DataFusion any known boundaries on the values of `date`
+ // (for example, maybe you know you only have data up to `2020-09-15`), but
+ // in this case, let's say we don't know any boundaries beforehand so we use
+ // `try_new_unknown`
+ let boundaries = ExprBoundaries::try_new_unbounded(&schema)?;
+
+ // Now, we invoke the analysis code to perform the range analysis
+ let physical_expr = physical_expr(&schema, expr)?;
+ let analysis_result =
+ analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;
+
+ // The results of the analysis is an range, encoded as an `Interval`, for
+ // each column in the schema, that must be true in order for the predicate
+ // to be true.
+ //
+ // In this case, we can see that, as expected, `analyze` has figured out
+ // that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']`
+ let expected_range = Interval::try_new(september_1, october_1)?;
+ assert_eq!(analysis_result.boundaries[0].interval, expected_range);
+
Ok(())
}
@@ -132,3 +241,18 @@ fn make_ts_field(name: &str) -> Field {
let tz = None;
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
}
+
+/// Build a physical expression from a logical one, after applying simplification and type coercion
+pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
+ let df_schema = schema.clone().to_dfschema_ref()?;
+
+ // Simplify
+ let props = ExecutionProps::new();
+ let simplifier =
+ ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));
+
+ // apply type coercion here to ensure types match
+ let expr = simplifier.coerce(expr, df_schema.clone())?;
+
+ create_physical_expr(&expr, df_schema.as_ref(), schema, &props)
+}
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index bf9a4abf4f..b3ebbc6e36 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -283,12 +283,20 @@
//!
//! ## Plan Representations
//!
-//! Logical planning yields [`LogicalPlan`]s nodes and [`Expr`]
+//! ### Logical Plans
+//! Logical planning yields [`LogicalPlan`] nodes and [`Expr`]
//! expressions which are [`Schema`] aware and represent statements
//! independent of how they are physically executed.
//! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
//! [`LogicalPlan`]s, each potentially containing embedded [`Expr`]s.
//!
+//! Examples of working with and executing `Expr`s can be found in the
+//! [`expr_api`.rs] example
+//!
+//! [`expr_api`.rs]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs
+//!
+//! ### Physical Plans
+//!
//! An [`ExecutionPlan`] (sometimes referred to as a "physical plan")
//! is a plan that can be executed against data. It a DAG of other
//! [`ExecutionPlan`]s each potentially containing expressions of the
diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs
index dc12bdf46a..f43434362a 100644
--- a/datafusion/physical-expr/src/analysis.rs
+++ b/datafusion/physical-expr/src/analysis.rs
@@ -72,8 +72,12 @@ impl AnalysisContext {
}
}
-/// Represents the boundaries of the resulting value from a physical expression,
-/// if it were to be an expression, if it were to be evaluated.
+/// Represents the boundaries (e.g. min and max values) of a particular column
+///
+/// This is used range analysis of expressions, to determine if the expression
+/// limits the value of particular columns (e.g. analyzing an expression such as
+/// `time < 50` would result in a boundary interval for `time` having a max
+/// value of `50`).
#[derive(Clone, Debug, PartialEq)]
pub struct ExprBoundaries {
pub column: Column,
@@ -111,6 +115,23 @@ impl ExprBoundaries {
distinct_count: col_stats.distinct_count.clone(),
})
}
+
+ /// Create `ExprBoundaries` that represent no known bounds for all the
+ /// columns in `schema`
+ pub fn try_new_unbounded(schema: &Schema) -> Result<Vec<Self>> {
+ schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, field)| {
+ Ok(Self {
+ column: Column::new(field.name(), i),
+ interval: Interval::make_unbounded(field.data_type())?,
+ distinct_count: Precision::Absent,
+ })
+ })
+ .collect()
+ }
}
/// Attempts to refine column boundaries and compute a selectivity value.
diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md
index a8baf24d5f..96be8ef7f1 100644
--- a/docs/source/library-user-guide/working-with-exprs.md
+++ b/docs/source/library-user-guide/working-with-exprs.md
@@ -17,7 +17,7 @@
under the License.
-->
-# Working with Exprs
+# Working with `Expr`s
<!-- https://github.com/apache/arrow-datafusion/issues/7304 -->
@@ -48,12 +48,11 @@ As another example, the SQL expression `a + b * c` would be represented as an `E
└────────────────────┘ └────────────────────┘
```
-As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.
+As the writer of a library, you can use `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.
-There are also executable examples for working with `Expr`s:
+## Creating and Evaluating `Expr`s
-- [rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs)
-- [expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs)
+Please see [expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs) for well commented code for creating, evaluating, simplifying, and analyzing `Expr`s.
## A Scalar UDF Example
@@ -79,7 +78,9 @@ let expr = add_one_udf.call(vec![col("my_column")]);
If you'd like to learn more about `Expr`s, before we get into the details of creating and rewriting them, you can read the [expression user-guide](./../user-guide/expressions.md).
-## Rewriting Exprs
+## Rewriting `Expr`s
+
+[rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs) contains example code for rewriting `Expr`s.
Rewriting Expressions is the process of taking an `Expr` and transforming it into another `Expr`. This is useful for a number of reasons, including: