You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/02/21 06:46:00 UTC

[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5290: TopDown EnforceSorting implementation

mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112608135


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -235,6 +268,186 @@ pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
     }
 }
 
+/// Checks whether the required ordering requirements are satisfied by the provided [PhysicalSortExpr]s.
+pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortExpr]>,
+    required: Option<&[PhysicalSortRequirements]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            ordering_satisfy_requirement_concrete(provided, required, equal_properties)
+        }
+    }
+}
+
+pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> EquivalenceProperties>(
+    provided: &[PhysicalSortExpr],
+    required: &[PhysicalSortRequirements],
+    equal_properties: F,
+) -> bool {
+    if required.len() > provided.len() {
+        false
+    } else if required
+        .iter()
+        .zip(provided.iter())
+        .all(|(order1, order2)| order2.satisfy(order1))
+    {
+        true
+    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+        let normalized_requirements = required
+            .iter()
+            .map(|e| {
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
+            })
+            .collect::<Vec<_>>();
+        let normalized_provided_exprs = provided
+            .iter()
+            .map(|e| {
+                normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+            })
+            .collect::<Vec<_>>();
+        normalized_requirements
+            .iter()
+            .zip(normalized_provided_exprs.iter())
+            .all(|(order1, order2)| order2.satisfy(order1))
+    } else {
+        false
+    }
+}
+
+/// Provided requirements are compatible with the required, which means the provided requirements are equal or more specific than the required
+pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortRequirements]>,
+    required: Option<&[PhysicalSortRequirements]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            if required.len() > provided.len() {
+                false
+            } else if required
+                .iter()
+                .zip(provided.iter())
+                .all(|(req, pro)| pro.compatible(req))
+            {
+                true
+            } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+                let normalized_required = required
+                    .iter()
+                    .map(|e| {
+                        normalize_sort_requirement_with_equivalence_properties(
+                            e.clone(),
+                            eq_classes,
+                        )
+                    })
+                    .collect::<Vec<_>>();
+                let normalized_provided = provided
+                    .iter()
+                    .map(|e| {
+                        normalize_sort_requirement_with_equivalence_properties(
+                            e.clone(),
+                            eq_classes,
+                        )
+                    })
+                    .collect::<Vec<_>>();
+                normalized_required
+                    .iter()
+                    .zip(normalized_provided.iter())
+                    .all(|(req, pro)| pro.compatible(req))
+            } else {
+                false
+            }
+        }
+    }
+}
+
+pub fn map_columns_before_projection(
+    parent_required: &[Arc<dyn PhysicalExpr>],
+    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Vec<Arc<dyn PhysicalExpr>> {
+    let mut column_mapping = HashMap::new();
+    for (expression, name) in proj_exprs.iter() {
+        if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+            column_mapping.insert(name.clone(), column.clone());
+        };
+    }
+    let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
+        .iter()
+        .filter_map(|r| {
+            if let Some(column) = r.as_any().downcast_ref::<Column>() {
+                column_mapping.get(column.name())
+            } else {
+                None
+            }
+        })
+        .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
+        .collect::<Vec<_>>();
+    new_required
+}
+
+pub fn map_requirement_before_projection(
+    parent_required: Option<&[PhysicalSortRequirements]>,
+    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Option<Vec<PhysicalSortRequirements>> {
+    if let Some(requirement) = parent_required {
+        let required_expr = create_sort_expr_from_requirement(requirement)
+            .iter()
+            .map(|sort_expr| sort_expr.expr.clone())
+            .collect::<Vec<_>>();
+        let new_exprs = map_columns_before_projection(&required_expr, proj_exprs);
+        if new_exprs.len() == requirement.len() {
+            let new_request = new_exprs
+                .iter()
+                .zip(requirement.iter())
+                .map(|(new, old)| PhysicalSortRequirements {
+                    expr: new.clone(),
+                    sort_options: old.sort_options,
+                })
+                .collect::<Vec<_>>();
+            Some(new_request)
+        } else {
+            None
+        }
+    } else {
+        None
+    }
+}
+
+pub fn create_sort_expr_from_requirement(
+    required: &[PhysicalSortRequirements],
+) -> Vec<PhysicalSortExpr> {
+    let parent_required_expr = required
+        .iter()
+        .map(|prop| {
+            if prop.sort_options.is_some() {
+                PhysicalSortExpr {
+                    expr: prop.expr.clone(),
+                    options: prop.sort_options.unwrap(),
+                }
+            } else {

Review Comment:
   I think we can use `If let Some(sort_options) = prop.sort_options`  idiom here. This would remove `.unwrap()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org