You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/22 22:15:27 UTC
[arrow-datafusion] branch master updated: Refine Err propagation and avoid unwrap in transform closures (#4318)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 94cd98208 Refine Err propagation and avoid unwrap in transform closures (#4318)
94cd98208 is described below
commit 94cd98208af4433fd992ac3b7b738db98f791635
Author: mingmwang <mi...@ebay.com>
AuthorDate: Wed Nov 23 06:15:21 2022 +0800
Refine Err propagation and avoid unwrap in transform closures (#4318)
---
.../src/physical_optimizer/coalesce_batches.rs | 6 +-
.../core/src/physical_optimizer/enforcement.rs | 252 ++++++++++-----------
.../src/physical_plan/joins/sort_merge_join.rs | 49 ++--
datafusion/core/src/physical_plan/joins/utils.rs | 6 +-
datafusion/core/src/physical_plan/rewrite.rs | 10 +-
datafusion/physical-expr/src/expressions/case.rs | 12 +-
datafusion/physical-expr/src/rewrite.rs | 10 +-
datafusion/physical-expr/src/utils.rs | 6 +-
8 files changed, 171 insertions(+), 180 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index df7f9e552..941c5c141 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -59,12 +59,12 @@ impl PhysicalOptimizerRule for CoalesceBatches {
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
if wrap_in_coalesce {
- Some(Arc::new(CoalesceBatchesExec::new(
+ Ok(Some(Arc::new(CoalesceBatchesExec::new(
plan.clone(),
target_batch_size,
- )))
+ ))))
} else {
- None
+ Ok(None)
}
})
}
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index b27a4af06..f3f63fcb2 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -86,14 +86,14 @@ impl PhysicalOptimizerRule for BasicEnforcement {
new_plan.transform_up(&{
|plan| {
let adjusted = if !top_down_join_key_reordering {
- reorder_join_keys_to_inputs(plan)
+ reorder_join_keys_to_inputs(plan)?
} else {
plan
};
- Some(ensure_distribution_and_ordering(
+ Ok(Some(ensure_distribution_and_ordering(
adjusted,
target_partitions,
- ))
+ )?))
}
})
}
@@ -151,7 +151,7 @@ impl PhysicalOptimizerRule for BasicEnforcement {
///
fn adjust_input_keys_ordering(
requirements: PlanWithKeyRequirements,
-) -> Option<PlanWithKeyRequirements> {
+) -> Result<Option<PlanWithKeyRequirements>> {
let parent_required = requirements.required_key_ordering.clone();
let plan_any = requirements.plan.as_any();
if let Some(HashJoinExec {
@@ -169,26 +169,23 @@ fn adjust_input_keys_ordering(
PartitionMode::Partitioned => {
let join_constructor =
|new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
- Arc::new(
- HashJoinExec::try_new(
- left.clone(),
- right.clone(),
- new_conditions.0,
- filter.clone(),
- join_type,
- PartitionMode::Partitioned,
- null_equals_null,
- )
- .unwrap(),
- ) as Arc<dyn ExecutionPlan>
+ Ok(Arc::new(HashJoinExec::try_new(
+ left.clone(),
+ right.clone(),
+ new_conditions.0,
+ filter.clone(),
+ join_type,
+ PartitionMode::Partitioned,
+ null_equals_null,
+ )?) as Arc<dyn ExecutionPlan>)
};
- Some(reorder_partitioned_join_keys(
+ Ok(Some(reorder_partitioned_join_keys(
requirements.plan.clone(),
&parent_required,
on,
vec![],
&join_constructor,
- ))
+ )?))
}
PartitionMode::CollectLeft => {
let new_right_request = match join_type {
@@ -206,11 +203,11 @@ fn adjust_input_keys_ordering(
};
// Push down requirements to the right side
- Some(PlanWithKeyRequirements {
+ Ok(Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![None, new_right_request],
- })
+ }))
}
}
} else if let Some(CrossJoinExec { left, .. }) =
@@ -218,14 +215,14 @@ fn adjust_input_keys_ordering(
{
let left_columns_len = left.schema().fields().len();
// Push down requirements to the right side
- Some(PlanWithKeyRequirements {
+ Ok(Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![
None,
shift_right_required(&parent_required, left_columns_len),
],
- })
+ }))
} else if let Some(SortMergeJoinExec {
left,
right,
@@ -238,25 +235,22 @@ fn adjust_input_keys_ordering(
{
let join_constructor =
|new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
- Arc::new(
- SortMergeJoinExec::try_new(
- left.clone(),
- right.clone(),
- new_conditions.0,
- *join_type,
- new_conditions.1,
- *null_equals_null,
- )
- .unwrap(),
- ) as Arc<dyn ExecutionPlan>
+ Ok(Arc::new(SortMergeJoinExec::try_new(
+ left.clone(),
+ right.clone(),
+ new_conditions.0,
+ *join_type,
+ new_conditions.1,
+ *null_equals_null,
+ )?) as Arc<dyn ExecutionPlan>)
};
- Some(reorder_partitioned_join_keys(
+ Ok(Some(reorder_partitioned_join_keys(
requirements.plan.clone(),
&parent_required,
on,
sort_options.clone(),
&join_constructor,
- ))
+ )?))
} else if let Some(AggregateExec {
mode,
group_by,
@@ -268,19 +262,21 @@ fn adjust_input_keys_ordering(
{
if !parent_required.is_empty() {
match mode {
- AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
+ AggregateMode::FinalPartitioned => Ok(Some(reorder_aggregate_keys(
requirements.plan.clone(),
&parent_required,
group_by,
aggr_expr,
input.clone(),
input_schema,
- )),
- _ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
+ )?)),
+ _ => Ok(Some(PlanWithKeyRequirements::new(
+ requirements.plan.clone(),
+ ))),
}
} else {
// Keep everything unchanged
- None
+ Ok(None)
}
} else if let Some(ProjectionExec { expr, .. }) =
plan_any.downcast_ref::<ProjectionExec>()
@@ -290,28 +286,32 @@ fn adjust_input_keys_ordering(
// Construct a mapping from new name to the the orginal Column
let new_required = map_columns_before_projection(&parent_required, expr);
if new_required.len() == parent_required.len() {
- Some(PlanWithKeyRequirements {
+ Ok(Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![Some(new_required.clone())],
- })
+ }))
} else {
// Can not satisfy, clear the current requirements and generate new empty requirements
- Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+ Ok(Some(PlanWithKeyRequirements::new(
+ requirements.plan.clone(),
+ )))
}
} else if plan_any.downcast_ref::<RepartitionExec>().is_some()
|| plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
|| plan_any.downcast_ref::<WindowAggExec>().is_some()
{
- Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
+ Ok(Some(PlanWithKeyRequirements::new(
+ requirements.plan.clone(),
+ )))
} else {
// By default, push down the parent requirements to children
let children_len = requirements.plan.children().len();
- Some(PlanWithKeyRequirements {
+ Ok(Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![Some(parent_required.clone()); children_len],
- })
+ }))
}
}
@@ -321,9 +321,9 @@ fn reorder_partitioned_join_keys<F>(
on: &[(Column, Column)],
sort_options: Vec<SortOptions>,
join_constructor: &F,
-) -> PlanWithKeyRequirements
+) -> Result<PlanWithKeyRequirements>
where
- F: Fn((Vec<(Column, Column)>, Vec<SortOptions>)) -> Arc<dyn ExecutionPlan>,
+ F: Fn((Vec<(Column, Column)>, Vec<SortOptions>)) -> Result<Arc<dyn ExecutionPlan>>,
{
let join_key_pairs = extract_join_keys(on);
if let Some((
@@ -344,27 +344,27 @@ where
new_sort_options.push(sort_options[new_positions[idx]])
}
- PlanWithKeyRequirements {
- plan: join_constructor((new_join_on, new_sort_options)),
+ Ok(PlanWithKeyRequirements {
+ plan: join_constructor((new_join_on, new_sort_options))?,
required_key_ordering: vec![],
request_key_ordering: vec![Some(left_keys), Some(right_keys)],
- }
+ })
} else {
- PlanWithKeyRequirements {
+ Ok(PlanWithKeyRequirements {
plan: join_plan,
required_key_ordering: vec![],
request_key_ordering: vec![Some(left_keys), Some(right_keys)],
- }
+ })
}
} else {
- PlanWithKeyRequirements {
+ Ok(PlanWithKeyRequirements {
plan: join_plan,
required_key_ordering: vec![],
request_key_ordering: vec![
Some(join_key_pairs.left_keys),
Some(join_key_pairs.right_keys),
],
- }
+ })
}
}
@@ -375,7 +375,7 @@ fn reorder_aggregate_keys(
aggr_expr: &[Arc<dyn AggregateExpr>],
agg_input: Arc<dyn ExecutionPlan>,
input_schema: &SchemaRef,
-) -> PlanWithKeyRequirements {
+) -> Result<PlanWithKeyRequirements> {
let out_put_columns = group_by
.expr()
.iter()
@@ -392,11 +392,11 @@ fn reorder_aggregate_keys(
|| !group_by.null_expr().is_empty()
|| expr_list_eq_strict_order(&out_put_exprs, parent_required)
{
- PlanWithKeyRequirements::new(agg_plan)
+ Ok(PlanWithKeyRequirements::new(agg_plan))
} else {
let new_positions = expected_expr_positions(&out_put_exprs, parent_required);
match new_positions {
- None => PlanWithKeyRequirements::new(agg_plan),
+ None => Ok(PlanWithKeyRequirements::new(agg_plan)),
Some(positions) => {
let new_partial_agg = if let Some(AggregateExec {
mode,
@@ -416,16 +416,13 @@ fn reorder_aggregate_keys(
let new_partial_group_by =
PhysicalGroupBy::new_single(new_group_exprs);
// new Partial AggregateExec
- Some(Arc::new(
- AggregateExec::try_new(
- AggregateMode::Partial,
- new_partial_group_by,
- aggr_expr.clone(),
- input.clone(),
- input_schema.clone(),
- )
- .unwrap(),
- ))
+ Some(Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ new_partial_group_by,
+ aggr_expr.clone(),
+ input.clone(),
+ input_schema.clone(),
+ )?))
} else {
None
}
@@ -438,16 +435,13 @@ fn reorder_aggregate_keys(
new_group_exprs.push(group_by.expr()[idx].clone());
}
let new_group_by = PhysicalGroupBy::new_single(new_group_exprs);
- let new_final_agg = Arc::new(
- AggregateExec::try_new(
- AggregateMode::FinalPartitioned,
- new_group_by,
- aggr_expr.to_vec(),
- partial_agg,
- input_schema.clone(),
- )
- .unwrap(),
- );
+ let new_final_agg = Arc::new(AggregateExec::try_new(
+ AggregateMode::FinalPartitioned,
+ new_group_by,
+ aggr_expr.to_vec(),
+ partial_agg,
+ input_schema.clone(),
+ )?);
// Need to create a new projection to change the expr ordering back
let mut proj_exprs = out_put_columns
@@ -475,11 +469,11 @@ fn reorder_aggregate_keys(
))
}
// TODO merge adjacent Projections if there are
- PlanWithKeyRequirements::new(Arc::new(
- ProjectionExec::try_new(proj_exprs, new_final_agg).unwrap(),
- ))
+ Ok(PlanWithKeyRequirements::new(Arc::new(
+ ProjectionExec::try_new(proj_exprs, new_final_agg)?,
+ )))
} else {
- PlanWithKeyRequirements::new(agg_plan)
+ Ok(PlanWithKeyRequirements::new(agg_plan))
}
}
}
@@ -562,7 +556,7 @@ fn shift_right_required(
/// and then can't apply the Top-Down reordering process.
fn reorder_join_keys_to_inputs(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
-) -> Arc<dyn crate::physical_plan::ExecutionPlan> {
+) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
let plan_any = plan.as_any();
if let Some(HashJoinExec {
left,
@@ -593,26 +587,23 @@ fn reorder_join_keys_to_inputs(
) {
if !new_positions.is_empty() {
let new_join_on = new_join_conditions(&left_keys, &right_keys);
- Arc::new(
- HashJoinExec::try_new(
- left.clone(),
- right.clone(),
- new_join_on,
- filter.clone(),
- join_type,
- PartitionMode::Partitioned,
- null_equals_null,
- )
- .unwrap(),
- )
+ Ok(Arc::new(HashJoinExec::try_new(
+ left.clone(),
+ right.clone(),
+ new_join_on,
+ filter.clone(),
+ join_type,
+ PartitionMode::Partitioned,
+ null_equals_null,
+ )?))
} else {
- plan
+ Ok(plan)
}
} else {
- plan
+ Ok(plan)
}
}
- _ => plan,
+ _ => Ok(plan),
}
} else if let Some(SortMergeJoinExec {
left,
@@ -644,25 +635,22 @@ fn reorder_join_keys_to_inputs(
for idx in 0..sort_options.len() {
new_sort_options.push(sort_options[new_positions[idx]])
}
- Arc::new(
- SortMergeJoinExec::try_new(
- left.clone(),
- right.clone(),
- new_join_on,
- *join_type,
- new_sort_options,
- *null_equals_null,
- )
- .unwrap(),
- )
+ Ok(Arc::new(SortMergeJoinExec::try_new(
+ left.clone(),
+ right.clone(),
+ new_join_on,
+ *join_type,
+ new_sort_options,
+ *null_equals_null,
+ )?))
} else {
- plan
+ Ok(plan)
}
} else {
- plan
+ Ok(plan)
}
} else {
- plan
+ Ok(plan)
}
}
@@ -839,9 +827,9 @@ fn new_join_conditions(
fn ensure_distribution_and_ordering(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
target_partitions: usize,
-) -> Arc<dyn crate::physical_plan::ExecutionPlan> {
+) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
- return plan;
+ return Ok(plan);
}
let required_input_distributions = plan.required_input_distribution();
let required_input_orderings = plan.required_input_ordering();
@@ -858,17 +846,17 @@ fn ensure_distribution_and_ordering(
.output_partitioning()
.satisfy(required.clone(), || child.equivalence_properties())
{
- child
+ Ok(child)
} else {
- let new_child: Arc<dyn ExecutionPlan> = match required {
+ let new_child: Result<Arc<dyn ExecutionPlan>> = match required {
Distribution::SinglePartition
if child.output_partitioning().partition_count() > 1 =>
{
- Arc::new(CoalescePartitionsExec::new(child.clone()))
+ Ok(Arc::new(CoalescePartitionsExec::new(child.clone())))
}
_ => {
let partition = required.create_partitioning(target_partitions);
- Arc::new(RepartitionExec::try_new(child, partition).unwrap())
+ Ok(Arc::new(RepartitionExec::try_new(child, partition)?))
}
};
new_child
@@ -876,27 +864,29 @@ fn ensure_distribution_and_ordering(
});
// Add SortExec to guarantee output ordering
- let new_children: Vec<Arc<dyn ExecutionPlan>> = children
+ let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
.zip(required_input_orderings.into_iter())
- .map(|(child, required)| {
+ .map(|(child_result, required)| {
+ let child = child_result?;
if ordering_satisfy(child.output_ordering(), required, || {
child.equivalence_properties()
}) {
- child
+ Ok(child)
} else {
let sort_expr = required.unwrap().to_vec();
if child.output_partitioning().partition_count() > 1 {
- Arc::new(SortExec::new_with_partitioning(
+ Ok(Arc::new(SortExec::new_with_partitioning(
sort_expr, child, true, None,
- ))
+ )) as Arc<dyn ExecutionPlan>)
} else {
- Arc::new(SortExec::try_new(sort_expr, child, None).unwrap())
+ Ok(Arc::new(SortExec::try_new(sort_expr, child, None)?)
+ as Arc<dyn ExecutionPlan>)
}
}
})
- .collect::<Vec<_>>();
+ .collect();
- with_new_children_if_necessary(plan, new_children).unwrap()
+ with_new_children_if_necessary(plan, new_children?)
}
/// Check the required ordering requirements are satisfied by the provided PhysicalSortExprs.
@@ -1713,7 +1703,7 @@ mod tests {
let bottom_left_join = ensure_distribution_and_ordering(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
- );
+ )?;
// Projection(a as A, a as AA, b as B, c as C)
let alias_pairs: Vec<(String, String)> = vec![
@@ -1743,7 +1733,7 @@ mod tests {
let bottom_right_join = ensure_distribution_and_ordering(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
- );
+ )?;
// Join on (B == b1 and C == c and AA = a1)
let top_join_on = vec![
@@ -1782,7 +1772,7 @@ mod tests {
let top_join_plan =
format!("HashJoinExec: mode=Partitioned, join_type={:?}, on=[(Column {{ name: \"AA\", index: 1 }}, Column {{ name: \"a1\", index: 5 }}), (Column {{ name: \"B\", index: 2 }}, Column {{ name: \"b1\", index: 6 }}), (Column {{ name: \"C\", index: 3 }}, Column {{ name: \"c\", index: 2 }})]", &join_type);
- let reordered = reorder_join_keys_to_inputs(top_join);
+ let reordered = reorder_join_keys_to_inputs(top_join)?;
// The top joins' join key ordering is adjusted based on the children inputs.
let expected = &[
@@ -1832,7 +1822,7 @@ mod tests {
let bottom_left_join = ensure_distribution_and_ordering(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
- );
+ )?;
// Projection(a as A, a as AA, b as B, c as C)
let alias_pairs: Vec<(String, String)> = vec![
@@ -1862,7 +1852,7 @@ mod tests {
let bottom_right_join = ensure_distribution_and_ordering(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
- );
+ )?;
// Join on (B == b1 and C == c and AA = a1)
let top_join_on = vec![
@@ -1901,7 +1891,7 @@ mod tests {
let top_join_plan =
format!("HashJoinExec: mode=Partitioned, join_type={:?}, on=[(Column {{ name: \"C\", index: 3 }}, Column {{ name: \"c\", index: 2 }}), (Column {{ name: \"B\", index: 2 }}, Column {{ name: \"b1\", index: 6 }}), (Column {{ name: \"AA\", index: 1 }}, Column {{ name: \"a1\", index: 5 }})]", &join_type);
- let reordered = reorder_join_keys_to_inputs(top_join);
+ let reordered = reorder_join_keys_to_inputs(top_join)?;
// The top joins' join key ordering is adjusted based on the children inputs.
let expected = &[
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 5ea1b22a4..93d2b862e 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -153,31 +153,32 @@ impl SortMergeJoinExec {
.map(|sort_exprs| sort_exprs.to_vec()),
JoinType::Right => {
let left_columns_len = left.schema().fields.len();
- right.output_ordering().map(|sort_exprs| {
- sort_exprs
- .iter()
- .map(|e| {
- let new_expr = e
- .expr
- .clone()
- .transform_down(&|e| match e
- .as_any()
- .downcast_ref::<Column>()
- {
- Some(col) => Some(Arc::new(Column::new(
- col.name(),
- left_columns_len + col.index(),
- ))),
- None => None,
+ right
+ .output_ordering()
+ .map(|sort_exprs| {
+ let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = sort_exprs
+ .iter()
+ .map(|e| {
+ let new_expr =
+ e.expr.clone().transform_down(&|e| match e
+ .as_any()
+ .downcast_ref::<Column>(
+ ) {
+ Some(col) => Ok(Some(Arc::new(Column::new(
+ col.name(),
+ left_columns_len + col.index(),
+ )))),
+ None => Ok(None),
+ });
+ Ok(PhysicalSortExpr {
+ expr: new_expr?,
+ options: e.options,
})
- .unwrap();
- PhysicalSortExpr {
- expr: new_expr,
- options: e.options,
- }
- })
- .collect::<Vec<_>>()
- })
+ })
+ .collect();
+ new_sort_exprs
+ })
+ .map_or(Ok(None), |v| v.map(Some))?
}
JoinType::Full => None,
};
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs
index cc71ee6af..936b9343b 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -123,11 +123,11 @@ pub fn adjust_right_output_partitioning(
.into_iter()
.map(|expr| {
expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
- Some(col) => Some(Arc::new(Column::new(
+ Some(col) => Ok(Some(Arc::new(Column::new(
col.name(),
left_columns_len + col.index(),
- ))),
- None => None,
+ )))),
+ None => Ok(None),
})
.unwrap()
})
diff --git a/datafusion/core/src/physical_plan/rewrite.rs b/datafusion/core/src/physical_plan/rewrite.rs
index a431dd31a..2972b546b 100644
--- a/datafusion/core/src/physical_plan/rewrite.rs
+++ b/datafusion/core/src/physical_plan/rewrite.rs
@@ -78,7 +78,7 @@ pub trait TreeNodeRewritable: Clone {
/// The default tree traversal direction is transform_up(Postorder Traversal).
fn transform<F>(self, op: &F) -> Result<Self>
where
- F: Fn(Self) -> Option<Self>,
+ F: Fn(Self) -> Result<Option<Self>>,
{
self.transform_up(op)
}
@@ -88,10 +88,10 @@ pub trait TreeNodeRewritable: Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_down<F>(self, op: &F) -> Result<Self>
where
- F: Fn(Self) -> Option<Self>,
+ F: Fn(Self) -> Result<Option<Self>>,
{
let node_cloned = self.clone();
- let after_op = match op(node_cloned) {
+ let after_op = match op(node_cloned)? {
Some(value) => value,
None => self,
};
@@ -103,12 +103,12 @@ pub trait TreeNodeRewritable: Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_up<F>(self, op: &F) -> Result<Self>
where
- F: Fn(Self) -> Option<Self>,
+ F: Fn(Self) -> Result<Option<Self>>,
{
let after_op_children = self.map_children(|node| node.transform_up(op))?;
let after_op_children_clone = after_op_children.clone();
- let new_node = match op(after_op_children) {
+ let new_node = match op(after_op_children)? {
Some(value) => value,
None => after_op_children_clone,
};
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index ddf1d6bae..f499f5ebf 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -874,11 +874,11 @@ mod tests {
&|e| match e.as_any().downcast_ref::<crate::expressions::Literal>() {
Some(lit_value) => match lit_value.value() {
ScalarValue::Utf8(Some(str_value)) => {
- Some(lit(str_value.to_uppercase()))
+ Ok(Some(lit(str_value.to_uppercase())))
}
- _ => None,
+ _ => Ok(None),
},
- _ => None,
+ _ => Ok(None),
},
)
.unwrap();
@@ -891,11 +891,11 @@ mod tests {
{
Some(lit_value) => match lit_value.value() {
ScalarValue::Utf8(Some(str_value)) => {
- Some(lit(str_value.to_uppercase()))
+ Ok(Some(lit(str_value.to_uppercase())))
}
- _ => None,
+ _ => Ok(None),
},
- _ => None,
+ _ => Ok(None),
})
.unwrap();
diff --git a/datafusion/physical-expr/src/rewrite.rs b/datafusion/physical-expr/src/rewrite.rs
index 487d08455..327eabd4b 100644
--- a/datafusion/physical-expr/src/rewrite.rs
+++ b/datafusion/physical-expr/src/rewrite.rs
@@ -76,7 +76,7 @@ pub trait TreeNodeRewritable: Clone {
/// The default tree traversal direction is transform_up(Postorder Traversal).
fn transform<F>(self, op: &F) -> Result<Self>
where
- F: Fn(Self) -> Option<Self>,
+ F: Fn(Self) -> Result<Option<Self>>,
{
self.transform_up(op)
}
@@ -86,10 +86,10 @@ pub trait TreeNodeRewritable: Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_down<F>(self, op: &F) -> Result<Self>
where
- F: Fn(Self) -> Option<Self>,
+ F: Fn(Self) -> Result<Option<Self>>,
{
let node_cloned = self.clone();
- let after_op = match op(node_cloned) {
+ let after_op = match op(node_cloned)? {
Some(value) => value,
None => self,
};
@@ -101,12 +101,12 @@ pub trait TreeNodeRewritable: Clone {
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_up<F>(self, op: &F) -> Result<Self>
where
- F: Fn(Self) -> Option<Self>,
+ F: Fn(Self) -> Result<Option<Self>>,
{
let after_op_children = self.map_children(|node| node.transform_up(op))?;
let after_op_children_clone = after_op_children.clone();
- let new_node = match op(after_op_children) {
+ let new_node = match op(after_op_children)? {
Some(value) => value,
None => after_op_children_clone,
};
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 78ce9c931..ea64d7397 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -141,7 +141,7 @@ pub fn normalize_out_expr_with_alias_schema(
}
None => None,
};
- normalized_form
+ Ok(normalized_form)
})
.unwrap_or(expr)
}
@@ -161,9 +161,9 @@ pub fn normalize_expr_with_equivalence_properties(
break;
}
}
- normalized
+ Ok(normalized)
}
- None => None,
+ None => Ok(None),
})
.unwrap_or(expr)
}