You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/22 22:15:27 UTC

[arrow-datafusion] branch master updated: Refine Err propagation and avoid unwrap in transform closures (#4318)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 94cd98208 Refine Err propagation and avoid unwrap in transform closures (#4318)
94cd98208 is described below

commit 94cd98208af4433fd992ac3b7b738db98f791635
Author: mingmwang <mi...@ebay.com>
AuthorDate: Wed Nov 23 06:15:21 2022 +0800

    Refine Err propagation and avoid unwrap in transform closures (#4318)
---
 .../src/physical_optimizer/coalesce_batches.rs     |   6 +-
 .../core/src/physical_optimizer/enforcement.rs     | 252 ++++++++++-----------
 .../src/physical_plan/joins/sort_merge_join.rs     |  49 ++--
 datafusion/core/src/physical_plan/joins/utils.rs   |   6 +-
 datafusion/core/src/physical_plan/rewrite.rs       |  10 +-
 datafusion/physical-expr/src/expressions/case.rs   |  12 +-
 datafusion/physical-expr/src/rewrite.rs            |  10 +-
 datafusion/physical-expr/src/utils.rs              |   6 +-
 8 files changed, 171 insertions(+), 180 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index df7f9e552..941c5c141 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -59,12 +59,12 @@ impl PhysicalOptimizerRule for CoalesceBatches {
                 || plan_any.downcast_ref::<HashJoinExec>().is_some()
                 || plan_any.downcast_ref::<RepartitionExec>().is_some();
             if wrap_in_coalesce {
-                Some(Arc::new(CoalesceBatchesExec::new(
+                Ok(Some(Arc::new(CoalesceBatchesExec::new(
                     plan.clone(),
                     target_batch_size,
-                )))
+                ))))
             } else {
-                None
+                Ok(None)
             }
         })
     }
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index b27a4af06..f3f63fcb2 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -86,14 +86,14 @@ impl PhysicalOptimizerRule for BasicEnforcement {
         new_plan.transform_up(&{
             |plan| {
                 let adjusted = if !top_down_join_key_reordering {
-                    reorder_join_keys_to_inputs(plan)
+                    reorder_join_keys_to_inputs(plan)?
                 } else {
                     plan
                 };
-                Some(ensure_distribution_and_ordering(
+                Ok(Some(ensure_distribution_and_ordering(
                     adjusted,
                     target_partitions,
-                ))
+                )?))
             }
         })
     }
@@ -151,7 +151,7 @@ impl PhysicalOptimizerRule for BasicEnforcement {
 ///
 fn adjust_input_keys_ordering(
     requirements: PlanWithKeyRequirements,
-) -> Option<PlanWithKeyRequirements> {
+) -> Result<Option<PlanWithKeyRequirements>> {
     let parent_required = requirements.required_key_ordering.clone();
     let plan_any = requirements.plan.as_any();
     if let Some(HashJoinExec {
@@ -169,26 +169,23 @@ fn adjust_input_keys_ordering(
             PartitionMode::Partitioned => {
                 let join_constructor =
                     |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
-                        Arc::new(
-                            HashJoinExec::try_new(
-                                left.clone(),
-                                right.clone(),
-                                new_conditions.0,
-                                filter.clone(),
-                                join_type,
-                                PartitionMode::Partitioned,
-                                null_equals_null,
-                            )
-                            .unwrap(),
-                        ) as Arc<dyn ExecutionPlan>
+                        Ok(Arc::new(HashJoinExec::try_new(
+                            left.clone(),
+                            right.clone(),
+                            new_conditions.0,
+                            filter.clone(),
+                            join_type,
+                            PartitionMode::Partitioned,
+                            null_equals_null,
+                        )?) as Arc<dyn ExecutionPlan>)
                     };
-                Some(reorder_partitioned_join_keys(
+                Ok(Some(reorder_partitioned_join_keys(
                     requirements.plan.clone(),
                     &parent_required,
                     on,
                     vec![],
                     &join_constructor,
-                ))
+                )?))
             }
             PartitionMode::CollectLeft => {
                 let new_right_request = match join_type {
@@ -206,11 +203,11 @@ fn adjust_input_keys_ordering(
                 };
 
                 // Push down requirements to the right side
-                Some(PlanWithKeyRequirements {
+                Ok(Some(PlanWithKeyRequirements {
                     plan: requirements.plan.clone(),
                     required_key_ordering: vec![],
                     request_key_ordering: vec![None, new_right_request],
-                })
+                }))
             }
         }
     } else if let Some(CrossJoinExec { left, .. }) =
@@ -218,14 +215,14 @@ fn adjust_input_keys_ordering(
     {
         let left_columns_len = left.schema().fields().len();
         // Push down requirements to the right side
-        Some(PlanWithKeyRequirements {
+        Ok(Some(PlanWithKeyRequirements {
             plan: requirements.plan.clone(),
             required_key_ordering: vec![],
             request_key_ordering: vec![
                 None,
                 shift_right_required(&parent_required, left_columns_len),
             ],
-        })
+        }))
     } else if let Some(SortMergeJoinExec {
         left,
         right,
@@ -238,25 +235,22 @@ fn adjust_input_keys_ordering(
     {
         let join_constructor =
             |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
-                Arc::new(
-                    SortMergeJoinExec::try_new(
-                        left.clone(),
-                        right.clone(),
-                        new_conditions.0,
-                        *join_type,
-                        new_conditions.1,
-                        *null_equals_null,
-                    )
-                    .unwrap(),
-                ) as Arc<dyn ExecutionPlan>
+                Ok(Arc::new(SortMergeJoinExec::try_new(
+                    left.clone(),
+                    right.clone(),
+                    new_conditions.0,
+                    *join_type,
+                    new_conditions.1,
+                    *null_equals_null,
+                )?) as Arc<dyn ExecutionPlan>)
             };
-        Some(reorder_partitioned_join_keys(
+        Ok(Some(reorder_partitioned_join_keys(
             requirements.plan.clone(),
             &parent_required,
             on,
             sort_options.clone(),
             &join_constructor,
-        ))
+        )?))
     } else if let Some(AggregateExec {
         mode,
         group_by,
@@ -268,19 +262,21 @@ fn adjust_input_keys_ordering(
     {
         if !parent_required.is_empty() {
             match mode {
-                AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
+                AggregateMode::FinalPartitioned => Ok(Some(reorder_aggregate_keys(
                     requirements.plan.clone(),
                     &parent_required,
                     group_by,
                     aggr_expr,
                     input.clone(),
                     input_schema,
-                )),
-                _ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
+                )?)),
+                _ => Ok(Some(PlanWithKeyRequirements::new(
+                    requirements.plan.clone(),
+                ))),
             }
         } else {
             // Keep everything unchanged
-            None
+            Ok(None)
         }
     } else if let Some(ProjectionExec { expr, .. }) =
         plan_any.downcast_ref::<ProjectionExec>()
@@ -290,28 +286,32 @@ fn adjust_input_keys_ordering(
         // Construct a mapping from new name to the the orginal Column
         let new_required = map_columns_before_projection(&parent_required, expr);
         if new_required.len() == parent_required.len() {
-            Some(PlanWithKeyRequirements {
+            Ok(Some(PlanWithKeyRequirements {
                 plan: requirements.plan.clone(),
                 required_key_ordering: vec![],
                 request_key_ordering: vec![Some(new_required.clone())],
-            })
+            }))
         } else {
             // Can not satisfy, clear the current requirements and generate new empty requirements
-            Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+            Ok(Some(PlanWithKeyRequirements::new(
+                requirements.plan.clone(),
+            )))
         }
     } else if plan_any.downcast_ref::<RepartitionExec>().is_some()
         || plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
         || plan_any.downcast_ref::<WindowAggExec>().is_some()
     {
-        Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+        Ok(Some(PlanWithKeyRequirements::new(
+            requirements.plan.clone(),
+        )))
     } else {
         // By default, push down the parent requirements to children
         let children_len = requirements.plan.children().len();
-        Some(PlanWithKeyRequirements {
+        Ok(Some(PlanWithKeyRequirements {
             plan: requirements.plan.clone(),
             required_key_ordering: vec![],
             request_key_ordering: vec![Some(parent_required.clone()); children_len],
-        })
+        }))
     }
 }
 
@@ -321,9 +321,9 @@ fn reorder_partitioned_join_keys<F>(
     on: &[(Column, Column)],
     sort_options: Vec<SortOptions>,
     join_constructor: &F,
-) -> PlanWithKeyRequirements
+) -> Result<PlanWithKeyRequirements>
 where
-    F: Fn((Vec<(Column, Column)>, Vec<SortOptions>)) -> Arc<dyn ExecutionPlan>,
+    F: Fn((Vec<(Column, Column)>, Vec<SortOptions>)) -> Result<Arc<dyn ExecutionPlan>>,
 {
     let join_key_pairs = extract_join_keys(on);
     if let Some((
@@ -344,27 +344,27 @@ where
                 new_sort_options.push(sort_options[new_positions[idx]])
             }
 
-            PlanWithKeyRequirements {
-                plan: join_constructor((new_join_on, new_sort_options)),
+            Ok(PlanWithKeyRequirements {
+                plan: join_constructor((new_join_on, new_sort_options))?,
                 required_key_ordering: vec![],
                 request_key_ordering: vec![Some(left_keys), Some(right_keys)],
-            }
+            })
         } else {
-            PlanWithKeyRequirements {
+            Ok(PlanWithKeyRequirements {
                 plan: join_plan,
                 required_key_ordering: vec![],
                 request_key_ordering: vec![Some(left_keys), Some(right_keys)],
-            }
+            })
         }
     } else {
-        PlanWithKeyRequirements {
+        Ok(PlanWithKeyRequirements {
             plan: join_plan,
             required_key_ordering: vec![],
             request_key_ordering: vec![
                 Some(join_key_pairs.left_keys),
                 Some(join_key_pairs.right_keys),
             ],
-        }
+        })
     }
 }
 
@@ -375,7 +375,7 @@ fn reorder_aggregate_keys(
     aggr_expr: &[Arc<dyn AggregateExpr>],
     agg_input: Arc<dyn ExecutionPlan>,
     input_schema: &SchemaRef,
-) -> PlanWithKeyRequirements {
+) -> Result<PlanWithKeyRequirements> {
     let out_put_columns = group_by
         .expr()
         .iter()
@@ -392,11 +392,11 @@ fn reorder_aggregate_keys(
         || !group_by.null_expr().is_empty()
         || expr_list_eq_strict_order(&out_put_exprs, parent_required)
     {
-        PlanWithKeyRequirements::new(agg_plan)
+        Ok(PlanWithKeyRequirements::new(agg_plan))
     } else {
         let new_positions = expected_expr_positions(&out_put_exprs, parent_required);
         match new_positions {
-            None => PlanWithKeyRequirements::new(agg_plan),
+            None => Ok(PlanWithKeyRequirements::new(agg_plan)),
             Some(positions) => {
                 let new_partial_agg = if let Some(AggregateExec {
                     mode,
@@ -416,16 +416,13 @@ fn reorder_aggregate_keys(
                         let new_partial_group_by =
                             PhysicalGroupBy::new_single(new_group_exprs);
                         // new Partial AggregateExec
-                        Some(Arc::new(
-                            AggregateExec::try_new(
-                                AggregateMode::Partial,
-                                new_partial_group_by,
-                                aggr_expr.clone(),
-                                input.clone(),
-                                input_schema.clone(),
-                            )
-                            .unwrap(),
-                        ))
+                        Some(Arc::new(AggregateExec::try_new(
+                            AggregateMode::Partial,
+                            new_partial_group_by,
+                            aggr_expr.clone(),
+                            input.clone(),
+                            input_schema.clone(),
+                        )?))
                     } else {
                         None
                     }
@@ -438,16 +435,13 @@ fn reorder_aggregate_keys(
                         new_group_exprs.push(group_by.expr()[idx].clone());
                     }
                     let new_group_by = PhysicalGroupBy::new_single(new_group_exprs);
-                    let new_final_agg = Arc::new(
-                        AggregateExec::try_new(
-                            AggregateMode::FinalPartitioned,
-                            new_group_by,
-                            aggr_expr.to_vec(),
-                            partial_agg,
-                            input_schema.clone(),
-                        )
-                        .unwrap(),
-                    );
+                    let new_final_agg = Arc::new(AggregateExec::try_new(
+                        AggregateMode::FinalPartitioned,
+                        new_group_by,
+                        aggr_expr.to_vec(),
+                        partial_agg,
+                        input_schema.clone(),
+                    )?);
 
                     // Need to create a new projection to change the expr ordering back
                     let mut proj_exprs = out_put_columns
@@ -475,11 +469,11 @@ fn reorder_aggregate_keys(
                         ))
                     }
                     // TODO merge adjacent Projections if there are
-                    PlanWithKeyRequirements::new(Arc::new(
-                        ProjectionExec::try_new(proj_exprs, new_final_agg).unwrap(),
-                    ))
+                    Ok(PlanWithKeyRequirements::new(Arc::new(
+                        ProjectionExec::try_new(proj_exprs, new_final_agg)?,
+                    )))
                 } else {
-                    PlanWithKeyRequirements::new(agg_plan)
+                    Ok(PlanWithKeyRequirements::new(agg_plan))
                 }
             }
         }
@@ -562,7 +556,7 @@ fn shift_right_required(
 /// and then can't apply the Top-Down reordering process.
 fn reorder_join_keys_to_inputs(
     plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
-) -> Arc<dyn crate::physical_plan::ExecutionPlan> {
+) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
     let plan_any = plan.as_any();
     if let Some(HashJoinExec {
         left,
@@ -593,26 +587,23 @@ fn reorder_join_keys_to_inputs(
                 ) {
                     if !new_positions.is_empty() {
                         let new_join_on = new_join_conditions(&left_keys, &right_keys);
-                        Arc::new(
-                            HashJoinExec::try_new(
-                                left.clone(),
-                                right.clone(),
-                                new_join_on,
-                                filter.clone(),
-                                join_type,
-                                PartitionMode::Partitioned,
-                                null_equals_null,
-                            )
-                            .unwrap(),
-                        )
+                        Ok(Arc::new(HashJoinExec::try_new(
+                            left.clone(),
+                            right.clone(),
+                            new_join_on,
+                            filter.clone(),
+                            join_type,
+                            PartitionMode::Partitioned,
+                            null_equals_null,
+                        )?))
                     } else {
-                        plan
+                        Ok(plan)
                     }
                 } else {
-                    plan
+                    Ok(plan)
                 }
             }
-            _ => plan,
+            _ => Ok(plan),
         }
     } else if let Some(SortMergeJoinExec {
         left,
@@ -644,25 +635,22 @@ fn reorder_join_keys_to_inputs(
                 for idx in 0..sort_options.len() {
                     new_sort_options.push(sort_options[new_positions[idx]])
                 }
-                Arc::new(
-                    SortMergeJoinExec::try_new(
-                        left.clone(),
-                        right.clone(),
-                        new_join_on,
-                        *join_type,
-                        new_sort_options,
-                        *null_equals_null,
-                    )
-                    .unwrap(),
-                )
+                Ok(Arc::new(SortMergeJoinExec::try_new(
+                    left.clone(),
+                    right.clone(),
+                    new_join_on,
+                    *join_type,
+                    new_sort_options,
+                    *null_equals_null,
+                )?))
             } else {
-                plan
+                Ok(plan)
             }
         } else {
-            plan
+            Ok(plan)
         }
     } else {
-        plan
+        Ok(plan)
     }
 }
 
@@ -839,9 +827,9 @@ fn new_join_conditions(
 fn ensure_distribution_and_ordering(
     plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
     target_partitions: usize,
-) -> Arc<dyn crate::physical_plan::ExecutionPlan> {
+) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
     if plan.children().is_empty() {
-        return plan;
+        return Ok(plan);
     }
     let required_input_distributions = plan.required_input_distribution();
     let required_input_orderings = plan.required_input_ordering();
@@ -858,17 +846,17 @@ fn ensure_distribution_and_ordering(
                 .output_partitioning()
                 .satisfy(required.clone(), || child.equivalence_properties())
             {
-                child
+                Ok(child)
             } else {
-                let new_child: Arc<dyn ExecutionPlan> = match required {
+                let new_child: Result<Arc<dyn ExecutionPlan>> = match required {
                     Distribution::SinglePartition
                         if child.output_partitioning().partition_count() > 1 =>
                     {
-                        Arc::new(CoalescePartitionsExec::new(child.clone()))
+                        Ok(Arc::new(CoalescePartitionsExec::new(child.clone())))
                     }
                     _ => {
                         let partition = required.create_partitioning(target_partitions);
-                        Arc::new(RepartitionExec::try_new(child, partition).unwrap())
+                        Ok(Arc::new(RepartitionExec::try_new(child, partition)?))
                     }
                 };
                 new_child
@@ -876,27 +864,29 @@ fn ensure_distribution_and_ordering(
         });
 
     // Add SortExec to guarantee output ordering
-    let new_children: Vec<Arc<dyn ExecutionPlan>> = children
+    let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
         .zip(required_input_orderings.into_iter())
-        .map(|(child, required)| {
+        .map(|(child_result, required)| {
+            let child = child_result?;
             if ordering_satisfy(child.output_ordering(), required, || {
                 child.equivalence_properties()
             }) {
-                child
+                Ok(child)
             } else {
                 let sort_expr = required.unwrap().to_vec();
                 if child.output_partitioning().partition_count() > 1 {
-                    Arc::new(SortExec::new_with_partitioning(
+                    Ok(Arc::new(SortExec::new_with_partitioning(
                         sort_expr, child, true, None,
-                    ))
+                    )) as Arc<dyn ExecutionPlan>)
                 } else {
-                    Arc::new(SortExec::try_new(sort_expr, child, None).unwrap())
+                    Ok(Arc::new(SortExec::try_new(sort_expr, child, None)?)
+                        as Arc<dyn ExecutionPlan>)
                 }
             }
         })
-        .collect::<Vec<_>>();
+        .collect();
 
-    with_new_children_if_necessary(plan, new_children).unwrap()
+    with_new_children_if_necessary(plan, new_children?)
 }
 
 /// Check the required ordering requirements are satisfied by the provided PhysicalSortExprs.
@@ -1713,7 +1703,7 @@ mod tests {
         let bottom_left_join = ensure_distribution_and_ordering(
             hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
             10,
-        );
+        )?;
 
         // Projection(a as A, a as AA, b as B, c as C)
         let alias_pairs: Vec<(String, String)> = vec![
@@ -1743,7 +1733,7 @@ mod tests {
         let bottom_right_join = ensure_distribution_and_ordering(
             hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
             10,
-        );
+        )?;
 
         // Join on (B == b1 and C == c and AA = a1)
         let top_join_on = vec![
@@ -1782,7 +1772,7 @@ mod tests {
             let top_join_plan =
                 format!("HashJoinExec: mode=Partitioned, join_type={:?}, on=[(Column {{ name: \"AA\", index: 1 }}, Column {{ name: \"a1\", index: 5 }}), (Column {{ name: \"B\", index: 2 }}, Column {{ name: \"b1\", index: 6 }}), (Column {{ name: \"C\", index: 3 }}, Column {{ name: \"c\", index: 2 }})]", &join_type);
 
-            let reordered = reorder_join_keys_to_inputs(top_join);
+            let reordered = reorder_join_keys_to_inputs(top_join)?;
 
             // The top joins' join key ordering is adjusted based on the children inputs.
             let expected = &[
@@ -1832,7 +1822,7 @@ mod tests {
         let bottom_left_join = ensure_distribution_and_ordering(
             hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
             10,
-        );
+        )?;
 
         // Projection(a as A, a as AA, b as B, c as C)
         let alias_pairs: Vec<(String, String)> = vec![
@@ -1862,7 +1852,7 @@ mod tests {
         let bottom_right_join = ensure_distribution_and_ordering(
             hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
             10,
-        );
+        )?;
 
         // Join on (B == b1 and C == c and AA = a1)
         let top_join_on = vec![
@@ -1901,7 +1891,7 @@ mod tests {
             let top_join_plan =
                 format!("HashJoinExec: mode=Partitioned, join_type={:?}, on=[(Column {{ name: \"C\", index: 3 }}, Column {{ name: \"c\", index: 2 }}), (Column {{ name: \"B\", index: 2 }}, Column {{ name: \"b1\", index: 6 }}), (Column {{ name: \"AA\", index: 1 }}, Column {{ name: \"a1\", index: 5 }})]", &join_type);
 
-            let reordered = reorder_join_keys_to_inputs(top_join);
+            let reordered = reorder_join_keys_to_inputs(top_join)?;
 
             // The top joins' join key ordering is adjusted based on the children inputs.
             let expected = &[
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 5ea1b22a4..93d2b862e 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -153,31 +153,32 @@ impl SortMergeJoinExec {
                 .map(|sort_exprs| sort_exprs.to_vec()),
             JoinType::Right => {
                 let left_columns_len = left.schema().fields.len();
-                right.output_ordering().map(|sort_exprs| {
-                    sort_exprs
-                        .iter()
-                        .map(|e| {
-                            let new_expr = e
-                                .expr
-                                .clone()
-                                .transform_down(&|e| match e
-                                    .as_any()
-                                    .downcast_ref::<Column>()
-                                {
-                                    Some(col) => Some(Arc::new(Column::new(
-                                        col.name(),
-                                        left_columns_len + col.index(),
-                                    ))),
-                                    None => None,
+                right
+                    .output_ordering()
+                    .map(|sort_exprs| {
+                        let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = sort_exprs
+                            .iter()
+                            .map(|e| {
+                                let new_expr =
+                                    e.expr.clone().transform_down(&|e| match e
+                                        .as_any()
+                                        .downcast_ref::<Column>(
+                                    ) {
+                                        Some(col) => Ok(Some(Arc::new(Column::new(
+                                            col.name(),
+                                            left_columns_len + col.index(),
+                                        )))),
+                                        None => Ok(None),
+                                    });
+                                Ok(PhysicalSortExpr {
+                                    expr: new_expr?,
+                                    options: e.options,
                                 })
-                                .unwrap();
-                            PhysicalSortExpr {
-                                expr: new_expr,
-                                options: e.options,
-                            }
-                        })
-                        .collect::<Vec<_>>()
-                })
+                            })
+                            .collect();
+                        new_sort_exprs
+                    })
+                    .map_or(Ok(None), |v| v.map(Some))?
             }
             JoinType::Full => None,
         };
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs
index cc71ee6af..936b9343b 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -123,11 +123,11 @@ pub fn adjust_right_output_partitioning(
                 .into_iter()
                 .map(|expr| {
                     expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
-                        Some(col) => Some(Arc::new(Column::new(
+                        Some(col) => Ok(Some(Arc::new(Column::new(
                             col.name(),
                             left_columns_len + col.index(),
-                        ))),
-                        None => None,
+                        )))),
+                        None => Ok(None),
                     })
                     .unwrap()
                 })
diff --git a/datafusion/core/src/physical_plan/rewrite.rs b/datafusion/core/src/physical_plan/rewrite.rs
index a431dd31a..2972b546b 100644
--- a/datafusion/core/src/physical_plan/rewrite.rs
+++ b/datafusion/core/src/physical_plan/rewrite.rs
@@ -78,7 +78,7 @@ pub trait TreeNodeRewritable: Clone {
     /// The default tree traversal direction is transform_up(Postorder Traversal).
     fn transform<F>(self, op: &F) -> Result<Self>
     where
-        F: Fn(Self) -> Option<Self>,
+        F: Fn(Self) -> Result<Option<Self>>,
     {
         self.transform_up(op)
     }
@@ -88,10 +88,10 @@ pub trait TreeNodeRewritable: Clone {
     /// When the `op` does not apply to a given node, it is left unchanged.
     fn transform_down<F>(self, op: &F) -> Result<Self>
     where
-        F: Fn(Self) -> Option<Self>,
+        F: Fn(Self) -> Result<Option<Self>>,
     {
         let node_cloned = self.clone();
-        let after_op = match op(node_cloned) {
+        let after_op = match op(node_cloned)? {
             Some(value) => value,
             None => self,
         };
@@ -103,12 +103,12 @@ pub trait TreeNodeRewritable: Clone {
     /// When the `op` does not apply to a given node, it is left unchanged.
     fn transform_up<F>(self, op: &F) -> Result<Self>
     where
-        F: Fn(Self) -> Option<Self>,
+        F: Fn(Self) -> Result<Option<Self>>,
     {
         let after_op_children = self.map_children(|node| node.transform_up(op))?;
 
         let after_op_children_clone = after_op_children.clone();
-        let new_node = match op(after_op_children) {
+        let new_node = match op(after_op_children)? {
             Some(value) => value,
             None => after_op_children_clone,
         };
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index ddf1d6bae..f499f5ebf 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -874,11 +874,11 @@ mod tests {
                 &|e| match e.as_any().downcast_ref::<crate::expressions::Literal>() {
                     Some(lit_value) => match lit_value.value() {
                         ScalarValue::Utf8(Some(str_value)) => {
-                            Some(lit(str_value.to_uppercase()))
+                            Ok(Some(lit(str_value.to_uppercase())))
                         }
-                        _ => None,
+                        _ => Ok(None),
                     },
-                    _ => None,
+                    _ => Ok(None),
                 },
             )
             .unwrap();
@@ -891,11 +891,11 @@ mod tests {
             {
                 Some(lit_value) => match lit_value.value() {
                     ScalarValue::Utf8(Some(str_value)) => {
-                        Some(lit(str_value.to_uppercase()))
+                        Ok(Some(lit(str_value.to_uppercase())))
                     }
-                    _ => None,
+                    _ => Ok(None),
                 },
-                _ => None,
+                _ => Ok(None),
             })
             .unwrap();
 
diff --git a/datafusion/physical-expr/src/rewrite.rs b/datafusion/physical-expr/src/rewrite.rs
index 487d08455..327eabd4b 100644
--- a/datafusion/physical-expr/src/rewrite.rs
+++ b/datafusion/physical-expr/src/rewrite.rs
@@ -76,7 +76,7 @@ pub trait TreeNodeRewritable: Clone {
     /// The default tree traversal direction is transform_up(Postorder Traversal).
     fn transform<F>(self, op: &F) -> Result<Self>
     where
-        F: Fn(Self) -> Option<Self>,
+        F: Fn(Self) -> Result<Option<Self>>,
     {
         self.transform_up(op)
     }
@@ -86,10 +86,10 @@ pub trait TreeNodeRewritable: Clone {
     /// When the `op` does not apply to a given node, it is left unchanged.
     fn transform_down<F>(self, op: &F) -> Result<Self>
     where
-        F: Fn(Self) -> Option<Self>,
+        F: Fn(Self) -> Result<Option<Self>>,
     {
         let node_cloned = self.clone();
-        let after_op = match op(node_cloned) {
+        let after_op = match op(node_cloned)? {
             Some(value) => value,
             None => self,
         };
@@ -101,12 +101,12 @@ pub trait TreeNodeRewritable: Clone {
     /// When the `op` does not apply to a given node, it is left unchanged.
     fn transform_up<F>(self, op: &F) -> Result<Self>
     where
-        F: Fn(Self) -> Option<Self>,
+        F: Fn(Self) -> Result<Option<Self>>,
     {
         let after_op_children = self.map_children(|node| node.transform_up(op))?;
 
         let after_op_children_clone = after_op_children.clone();
-        let new_node = match op(after_op_children) {
+        let new_node = match op(after_op_children)? {
             Some(value) => value,
             None => after_op_children_clone,
         };
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 78ce9c931..ea64d7397 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -141,7 +141,7 @@ pub fn normalize_out_expr_with_alias_schema(
                     }
                     None => None,
                 };
-            normalized_form
+            Ok(normalized_form)
         })
         .unwrap_or(expr)
 }
@@ -161,9 +161,9 @@ pub fn normalize_expr_with_equivalence_properties(
                         break;
                     }
                 }
-                normalized
+                Ok(normalized)
             }
-            None => None,
+            None => Ok(None),
         })
         .unwrap_or(expr)
 }