You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/02/28 17:28:14 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5419: refactor: ParquetExec logical expr. => phys. expr.

alamb commented on code in PR #5419:
URL: https://github.com/apache/arrow-datafusion/pull/5419#discussion_r1120449504


##########
datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs:
##########
@@ -324,12 +329,15 @@ mod tests {
     fn row_group_pruning_predicate_partial_expr() {
         use datafusion_expr::{col, lit};
         // test row group predicate with partially supported expression
-        // int > 1 and int % 2 => c1_max > 1 and true
-        let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2)));
+        // (int > 1) and ((int % 2) = 0) => c1_max > 1 and true

Review Comment:
   thank you - this is a good change



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -235,6 +239,80 @@ pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
     }
 }
 
+/// Extract referenced [`Column`]s within a [`PhysicalExpr`].
+///
+/// This works recursively.
+pub fn get_phys_expr_columns(pred: &Arc<dyn PhysicalExpr>) -> HashSet<Column> {
+    let mut rewriter = ColumnCollector::default();

Review Comment:
   👍  this is quite clever



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -84,7 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         &self,
         state: &SessionState,
         conf: FileScanConfig,
-        filters: &[Expr],
+        filters: Option<&Arc<dyn PhysicalExpr>>,

Review Comment:
   I think it is a good change to have this take a single `PhysicalExpr` rather than a slice as it is more consistent with `TableProvider::scan` 👍 



##########
datafusion/core/src/physical_optimizer/pruning.rs:
##########
@@ -187,7 +170,7 @@ impl PruningPredicate {
         // in the predicate means we don't know for sure if the row
         // group can be filtered out or not. To maintain correctness
         // the row group must be kept and thus `true` is returned.
-        match self.predicate_expr.evaluate(&statistics_batch)? {
+        match self.predicate_expr.evaluate(&statistics_batch).expect("bug") {

Review Comment:
   I think it would be nicer here to return the error rather than panic, but I also think panic'ing is ok



##########
datafusion/core/src/physical_optimizer/pruning.rs:
##########
@@ -794,51 +853,81 @@ fn build_predicate_expression(
     build_statistics_expr(&mut expr_builder).unwrap_or(unhandled)
 }
 
-fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result<Expr> {
-    let statistics_expr =
+fn build_statistics_expr(
+    expr_builder: &mut PruningExpressionBuilder,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    let statistics_expr: Arc<dyn PhysicalExpr> =
         match expr_builder.op() {
             Operator::NotEq => {
                 // column != literal => (min, max) = literal =>
                 // !(min != literal && max != literal) ==>
                 // min != literal || literal != max
                 let min_column_expr = expr_builder.min_column_expr()?;
                 let max_column_expr = expr_builder.max_column_expr()?;
-                min_column_expr
-                    .not_eq(expr_builder.scalar_expr().clone())
-                    .or(expr_builder.scalar_expr().clone().not_eq(max_column_expr))
+                Arc::new(phys_expr::BinaryExpr::new(
+                    Arc::new(phys_expr::BinaryExpr::new(
+                        min_column_expr,
+                        Operator::NotEq,
+                        expr_builder.scalar_expr().clone(),
+                    )),
+                    Operator::Or,
+                    Arc::new(phys_expr::BinaryExpr::new(
+                        expr_builder.scalar_expr().clone(),
+                        Operator::NotEq,
+                        max_column_expr,
+                    )),
+                ))
             }
             Operator::Eq => {
                 // column = literal => (min, max) = literal => min <= literal && literal <= max
                 // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
                 let min_column_expr = expr_builder.min_column_expr()?;
                 let max_column_expr = expr_builder.max_column_expr()?;
-                min_column_expr
-                    .lt_eq(expr_builder.scalar_expr().clone())
-                    .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr))
+                Arc::new(phys_expr::BinaryExpr::new(

Review Comment:
   it is unfortunate there isn't as nice "fluent" style APIs for building physical expressions. Maybe we can add a `PhysicalExprExt` trait that knew how to do so. 
   
   I'll file a ticket to track doing so as a follow on PR



##########
datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs:
##########
@@ -211,16 +211,16 @@ impl<'a> FilterCandidateBuilder<'a> {
     }
 }
 
-impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
-    fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
-        if let Expr::Column(column) = expr {
-            if let Ok(idx) = self.file_schema.index_of(&column.name) {
+impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for FilterCandidateBuilder<'a> {

Review Comment:
   This is cool to see that there is an equivalent logical and physical rewriter



##########
datafusion/core/src/physical_optimizer/pruning.rs:
##########
@@ -128,31 +126,16 @@ impl PruningPredicate {
     /// For example, the filter expression `(column / 2) = 4` becomes
     /// the pruning predicate
     /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
-    pub fn try_new(expr: Expr, schema: SchemaRef) -> Result<Self> {
+    pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
         // build predicate expression once
         let mut required_columns = RequiredStatColumns::new();
-        let logical_predicate_expr =
+        let predicate_expr =
             build_predicate_expression(&expr, schema.as_ref(), &mut required_columns);
-        let stat_fields = required_columns
-            .iter()
-            .map(|(_, _, f)| f.clone())
-            .collect::<Vec<_>>();
-        let stat_schema = Schema::new(stat_fields);
-        let stat_dfschema = DFSchema::try_from(stat_schema.clone())?;
-
-        // TODO allow these properties to be passed in

Review Comment:
   👍  now it uses the real thing . Nice!



##########
datafusion/core/src/physical_optimizer/pruning.rs:
##########
@@ -1840,12 +2034,34 @@ mod tests {
         (schema, statistics, expected_true, expected_false)
     }
 
+    #[test]
+    fn prune_bool_const_expr() {
+        let (schema, statistics, _, _) = bool_setup();
+
+        // true
+        let expr = lit(true);
+        let expr = logical2physical(&expr, &schema);
+        let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
+        let result = p.prune(&statistics).unwrap();
+        assert_eq!(result, vec![true, true, true, true, true]);
+
+        // false
+        // constant literals that do NOT refer to any columns are currently not evaluated at all, hence the result is
+        // "all true"
+        let expr = lit(false);

Review Comment:
   this is unfortunate (an all false predicate should skip the entire file). However, as long as this PR doesn't make the pruning worse in this situation I think it is fine (and we can improve things in a follow on PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org