You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/03/25 13:30:54 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5661: Top down `EnforceSorting`, Extended testbench for `EnforceSorting` rule to prepare for refactors, additional functionality such as pushdowns over unions

alamb commented on code in PR #5661:
URL: https://github.com/apache/arrow-datafusion/pull/5661#discussion_r1148365069


##########
datafusion/core/src/physical_optimizer/dist_enforcement.rs:
##########
@@ -2104,7 +2080,6 @@ mod tests {
         // The optimizer should not add an additional SortExec as the
         // data is already sorted
         let expected = &[

Review Comment:
   Why was the `SortPreservingMergeExec` removed? Is it because there is a single partition in the input? 
   
   I think the intent of this test was to ensure when a `SortPreservingMergeExec` a new sort was not added.
   
   Would it be possible to update the test so that the `SortPreservingMergeExec` is still present (perhaps by increasing the number of input partitions)? 
   
   Perhaps using the same strategy of `parquet_exec_multiple_sorted` 
   
   



##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -69,4 +62,70 @@ impl PhysicalSortExpr {
             options: Some(self.options),
         })
     }
+
+    pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool {
+        self.expr.eq(&requirement.expr)
+            && requirement
+                .options
+                .map_or(true, |opts| self.options == opts)
+    }
+}
+
+/// Represents sort requirement associated with a plan
+#[derive(Clone, Debug)]
+pub struct PhysicalSortRequirement {
+    /// Physical expression representing the column to sort

Review Comment:
   Is expr always a `PhysicalSortExpr`? If so the options on this structure might be redundant with the options on another
   
   https://docs.rs/datafusion/latest/datafusion/physical_plan/expressions/struct.PhysicalSortExpr.html



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -115,33 +118,21 @@ pub fn normalize_out_expr_with_alias_schema(
     alias_map: &HashMap<Column, Vec<Column>>,
     schema: &SchemaRef,
 ) -> Arc<dyn PhysicalExpr> {
-    let expr_clone = expr.clone();
-    expr_clone
+    expr.clone()
         .transform(&|expr| {
-            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
-                match expr.as_any().downcast_ref::<Column>() {
-                    Some(column) => {
-                        let out = alias_map
-                            .get(column)
-                            .map(|c| {
-                                let out_col: Arc<dyn PhysicalExpr> =
-                                    Arc::new(c[0].clone());
-                                out_col
-                            })
-                            .or_else(|| match schema.index_of(column.name()) {
-                                // Exactly matching, return None, no need to do the transform
-                                Ok(idx) if column.index() == idx => None,
-                                _ => {
-                                    let out_col: Arc<dyn PhysicalExpr> =
-                                        Arc::new(UnKnownColumn::new(column.name()));
-                                    Some(out_col)
-                                }
-                            });
-                        out
-                    }
-                    None => None,
-                };
-            Ok(normalized_form)
+            Ok(match expr.as_any().downcast_ref::<Column>() {

Review Comment:
   this is a "drive by" cleanup, right? Like not related to the rest of the code in the PR?



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -123,7 +122,7 @@ impl BoundedWindowAggExec {
         let mut result = vec![];
         // All window exprs have the same partition by, so we just use the first one:
         let partition_by = self.window_expr()[0].partition_by();
-        let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
+        let sort_keys = self.input.output_ordering().unwrap_or(&[]);

Review Comment:
   this makes sense to me -- specifically that `BoundedWindowExec` does not change the ordering of its input and thus there is no reason to store the sort key directly



##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -69,4 +62,70 @@ impl PhysicalSortExpr {
             options: Some(self.options),
         })
     }
+
+    pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool {
+        self.expr.eq(&requirement.expr)
+            && requirement
+                .options
+                .map_or(true, |opts| self.options == opts)
+    }
+}
+
+/// Represents sort requirement associated with a plan
+#[derive(Clone, Debug)]
+pub struct PhysicalSortRequirement {
+    /// Physical expression representing the column to sort
+    pub expr: Arc<dyn PhysicalExpr>,
+    /// Option to specify how the given column should be sorted.
+    /// If unspecified, there is no constraint on sort options.
+    pub options: Option<SortOptions>,
+}
+
+impl From<PhysicalSortExpr> for PhysicalSortRequirement {
+    fn from(value: PhysicalSortExpr) -> Self {
+        Self {
+            expr: value.expr,
+            options: Some(value.options),
+        }
+    }
+}
+
+impl PartialEq for PhysicalSortRequirement {
+    fn eq(&self, other: &PhysicalSortRequirement) -> bool {
+        self.options == other.options && self.expr.eq(&other.expr)
+    }
+}
+
+impl std::fmt::Display for PhysicalSortRequirement {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let opts_string = self.options.as_ref().map_or("NA", to_string);
+        write!(f, "{} {}", self.expr, opts_string)
+    }
+}
+
+impl PhysicalSortRequirement {
+    /// Returns whether this requirement is equal or more specific than `other`.
+    pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
+        self.expr.eq(&other.expr)
+            && other.options.map_or(true, |other_opts| {
+                self.options.map_or(false, |opts| opts == other_opts)
+            })
+    }
+}
+
+pub fn make_sort_requirements_from_exprs(
+    ordering: &[PhysicalSortExpr],
+) -> Vec<PhysicalSortRequirement> {
+    ordering.iter().map(|e| e.clone().into()).collect()
+}
+
+/// Returns the SQL string representation of the given [SortOptions] object.
+#[inline]
+fn to_string(options: &SortOptions) -> &str {

Review Comment:
   ```suggestion
   fn to_str(options: &SortOptions) -> &str {
   ```
   
   Given it returns a `str` I think calling it to `to_string` is slightly misleading



##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -69,4 +62,70 @@ impl PhysicalSortExpr {
             options: Some(self.options),
         })
     }
+
+    pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool {
+        self.expr.eq(&requirement.expr)
+            && requirement
+                .options
+                .map_or(true, |opts| self.options == opts)
+    }
+}
+
+/// Represents sort requirement associated with a plan
+#[derive(Clone, Debug)]
+pub struct PhysicalSortRequirement {
+    /// Physical expression representing the column to sort
+    pub expr: Arc<dyn PhysicalExpr>,
+    /// Option to specify how the given column should be sorted.
+    /// If unspecified, there is no constraint on sort options.
+    pub options: Option<SortOptions>,
+}
+
+impl From<PhysicalSortExpr> for PhysicalSortRequirement {
+    fn from(value: PhysicalSortExpr) -> Self {
+        Self {
+            expr: value.expr,
+            options: Some(value.options),
+        }
+    }
+}
+
+impl PartialEq for PhysicalSortRequirement {
+    fn eq(&self, other: &PhysicalSortRequirement) -> bool {
+        self.options == other.options && self.expr.eq(&other.expr)
+    }
+}
+
+impl std::fmt::Display for PhysicalSortRequirement {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let opts_string = self.options.as_ref().map_or("NA", to_string);
+        write!(f, "{} {}", self.expr, opts_string)
+    }
+}
+
+impl PhysicalSortRequirement {
+    /// Returns whether this requirement is equal or more specific than `other`.
+    pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
+        self.expr.eq(&other.expr)
+            && other.options.map_or(true, |other_opts| {
+                self.options.map_or(false, |opts| opts == other_opts)
+            })
+    }
+}
+
+pub fn make_sort_requirements_from_exprs(
+    ordering: &[PhysicalSortExpr],
+) -> Vec<PhysicalSortRequirement> {
+    ordering.iter().map(|e| e.clone().into()).collect()
+}
+
+/// Returns the SQL string representation of the given [SortOptions] object.
+#[inline]
+fn to_string(options: &SortOptions) -> &str {

Review Comment:
   This might be a nice thing (`impl Display`) to contribute upstream to arrow-rs eventually



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -210,31 +219,176 @@ pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
     } else if required
         .iter()
         .zip(provided.iter())
-        .all(|(order1, order2)| order1.eq(order2))
+        .all(|(req, given)| req.eq(given))
     {
         true
     } else if let eq_classes @ [_, ..] = equal_properties().classes() {
-        let normalized_required_exprs = required
+        required
             .iter()
             .map(|e| {
                 normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
             })
-            .collect::<Vec<_>>();
-        let normalized_provided_exprs = provided
+            .zip(provided.iter().map(|e| {
+                normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+            }))
+            .all(|(req, given)| req.eq(&given))
+    } else {
+        false
+    }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
+/// provided [`PhysicalSortExpr`]s.
+pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortExpr]>,
+    required: Option<&[PhysicalSortRequirement]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            ordering_satisfy_requirement_concrete(provided, required, equal_properties)
+        }
+    }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
+/// provided [`PhysicalSortExpr`]s.
+pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> EquivalenceProperties>(
+    provided: &[PhysicalSortExpr],
+    required: &[PhysicalSortRequirement],
+    equal_properties: F,
+) -> bool {
+    if required.len() > provided.len() {
+        false
+    } else if required
+        .iter()
+        .zip(provided.iter())
+        .all(|(req, given)| given.satisfy(req))
+    {
+        true
+    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+        required
             .iter()
             .map(|e| {
-                normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
             })
-            .collect::<Vec<_>>();
-        normalized_required_exprs
+            .zip(provided.iter().map(|e| {
+                normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+            }))
+            .all(|(req, given)| given.satisfy(&req))
+    } else {
+        false
+    }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
+/// specific than the provided [`PhysicalSortRequirement`]s.
+pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortRequirement]>,
+    required: Option<&[PhysicalSortRequirement]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            requirements_compatible_concrete(provided, required, equal_properties)
+        }
+    }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
+/// specific than the provided [`PhysicalSortRequirement`]s.
+fn requirements_compatible_concrete<F: FnOnce() -> EquivalenceProperties>(
+    provided: &[PhysicalSortRequirement],
+    required: &[PhysicalSortRequirement],
+    equal_properties: F,
+) -> bool {
+    if required.len() > provided.len() {
+        false
+    } else if required
+        .iter()
+        .zip(provided.iter())
+        .all(|(req, given)| given.compatible(req))
+    {
+        true
+    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+        required
             .iter()
-            .zip(normalized_provided_exprs.iter())
-            .all(|(order1, order2)| order1.eq(order2))
+            .map(|e| {
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
+            })
+            .zip(provided.iter().map(|e| {
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
+            }))
+            .all(|(req, given)| given.compatible(&req))
     } else {
         false
     }
 }
 
+pub fn map_columns_before_projection(

Review Comment:
   Can you please add a description as comments of what this function is supposed to be doing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org