You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/18 13:25:22 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #3855: [Phase1] Partition and Sort Enforcement rule

alamb commented on code in PR #3855:
URL: https://github.com/apache/arrow-datafusion/pull/3855#discussion_r998170962


##########
datafusion-cli/src/object_storage.rs:
##########
@@ -68,7 +68,7 @@ fn build_gcs_object_store(url: &Url) -> Result<Arc<dyn object_store::ObjectStore
     let host = get_host_name(url)?;
     let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(host);
 
-    if let Some(path) = env::var("GCP_SERVICE_ACCOUNT_PATH").ok() {
+    if let Ok(path) = env::var("GCP_SERVICE_ACCOUNT_PATH") {

Review Comment:
   these are nice cleanups but not directly connected to the other tickets in this PR. Were they suggested by clippy or just things you saw as you were reviewing the code?



##########
datafusion/physical-expr/src/expressions/column.rs:
##########
@@ -89,6 +91,26 @@ impl PhysicalExpr for Column {
         self.bounds_check(batch.schema().as_ref())?;
         Ok(ColumnarValue::Array(batch.column(self.index).clone()))
     }
+
+    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        Ok(self)
+    }
+}
+
+impl PartialEq<dyn Any> for Column {
+    fn eq(&self, other: &dyn Any) -> bool {

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -487,6 +574,82 @@ impl Partitioning {
             RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
         }
     }
+
+    /// Returns true when the guarantees made by this [[Partitioning]] are sufficient to
+    /// satisfy the partitioning scheme mandated by the `required` [[Distribution]]
+    pub fn satisfy<F: FnOnce() -> Vec<Vec<Column>>>(
+        &self,
+        required: Distribution,
+        equal_properties: F,
+    ) -> bool {
+        match required {
+            Distribution::UnspecifiedDistribution => true,
+            Distribution::SinglePartition if self.partition_count() == 1 => true,
+            Distribution::HashPartitioned(required_exprs) => {
+                match self {
+                    // Here we do not check the partition count for hash partitioning and assumes the partition count
+                    // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins,
+                    // then we need to have the partition count and hash functions validation.
+                    Partitioning::Hash(partition_exprs, _) => {
+                        let fast_match =
+                            expr_list_eq_any_order(&required_exprs, partition_exprs);
+                        // If the required exprs do not match, need to leverage the eq_properties provided by the child
+                        // and normalize both exprs based on the eq_properties
+                        if !fast_match {
+                            let eq_properties = equal_properties();
+                            if !eq_properties.is_empty() {
+                                let normalized_required_exprs = required_exprs
+                                    .iter()
+                                    .map(|e| {
+                                        normalize_expr_with_equivalence_properties(
+                                            e.clone(),
+                                            &eq_properties,
+                                        )
+                                    })
+                                    .collect::<Vec<_>>();
+                                let normalized_partition_exprs = partition_exprs
+                                    .iter()
+                                    .map(|e| {
+                                        normalize_expr_with_equivalence_properties(
+                                            e.clone(),
+                                            &eq_properties,
+                                        )
+                                    })
+                                    .collect::<Vec<_>>();
+                                expr_list_eq_any_order(
+                                    &normalized_required_exprs,
+                                    &normalized_partition_exprs,
+                                )
+                            } else {
+                                fast_match
+                            }
+                        } else {
+                            fast_match
+                        }
+                    }
+                    _ => false,
+                }
+            }
+            _ => false,
+        }
+    }
+}
+
+impl PartialEq for Partitioning {
+    fn eq(&self, other: &Partitioning) -> bool {
+        match (self, other) {
+            (
+                Partitioning::RoundRobinBatch(count1),
+                Partitioning::RoundRobinBatch(count2),
+            ) if count1 == count2 => true,
+            (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
+                if expr_list_eq_any_order(exprs1, exprs2) && (count1 == count2) =>

Review Comment:
   I don't know if this is true for our hash algorithm -- are we sure that `hash(expr1, expr2)` is equivalent to `hash(expr2, expr1)`?



##########
datafusion/core/src/physical_optimizer/enforcement.rs:
##########
@@ -0,0 +1,858 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
+//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
+//!
+use crate::error::Result;
+use crate::physical_optimizer::utils::transform_up;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
+use crate::prelude::SessionConfig;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::{
+    normalize_sort_expr_with_equivalence_properties, PhysicalSortExpr,
+};
+use std::sync::Arc;
+
+/// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met
+/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree
+/// and give a non-optimal plan, but it can avoid the possible data skew in joins
+///
+/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
+/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
+///
+/// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c).
+#[derive(Default)]
+pub struct BasicEnforcement {}

Review Comment:
   Maybe we could call this DistributionAndOrdering to better reflect what it is doing?



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -231,6 +245,39 @@ impl RecordBatchStream for FilterExecStream {
     }
 }
 
+/// Return the equals Column-Pairs and Non-equals Column-Pairs
+fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {

Review Comment:
   I think seeing  examples of this code working in tests would help verify its correctness 



##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -78,10 +83,24 @@ impl UnionExec {
             inputs[0].schema().metadata().clone(),
         ));
 
+        // If all the input partitions have the same Hash partition spec with the first_input_partition

Review Comment:
   I may have missed it, but I don't see any tests for this code (either new tests or changes in existing tests). 



##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -201,15 +285,18 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     ///
     /// The default implementation returns `true` unless this operator
     /// has signalled it requires a single child input partition.
-    fn benefits_from_input_partitioning(&self) -> bool {
+    fn prefer_parallel(&self) -> bool {
         // By default try to maximize parallelism with more CPUs if
-        // possible
-        !matches!(
-            self.required_child_distribution(),
-            Distribution::SinglePartition
-        )
+        // possibles
+        !self
+            .required_input_distribution()
+            .into_iter()
+            .any(|dist| matches!(dist, Distribution::SinglePartition))
     }
 
+    /// Get a list of equivalence properties within the plan

Review Comment:
   Can you please expand on this function's doc string -- specifically it would be nice to:
   
   1.  define what "equivalence" means (is it equality? Does it include nullability?)
   2.  explain what the ` Vec<Vec<Column>>` represents(a set of equivalence properties?) Can there be duplicates? `vec[vec[col("A"), col("B")], vec[col("A"), col("C")]`?
   3. explain what happens if an operator does not return any equivalence
   
   Some examples of what changes are made based on this information would likely also be helpful



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Collection of utility functions for Physical Expr optimization
+
+use crate::PhysicalExpr;
+use datafusion_common::DataFusionError;
+use std::result;
+use std::sync::Arc;
+
+pub type Result<T> = result::Result<T, DataFusionError>;
+
+/// Apply transform `F` to the PhysicalExpr's children, the transform `F` might have a direction(Preorder or Postorder)
+fn map_children<F>(
+    expr: Arc<dyn PhysicalExpr>,
+    transform: F,
+) -> Result<Arc<dyn PhysicalExpr>>
+where
+    F: Fn(Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
+{
+    if !expr.children().is_empty() {
+        let new_children: Result<Vec<_>> =
+            expr.children().into_iter().map(transform).collect();
+        with_new_children_if_necessary(expr, new_children?)
+    } else {
+        Ok(expr)
+    }
+}
+
+/// Convenience utils for writing optimizers rule: recursively apply the given `op` to the PhysicalExpr tree.
+/// When `op` does not apply to a given expr, it is left unchanged.
+/// The default tree traversal direction is transform_down(Preorder Traversal).
+pub fn transform<F>(expr: Arc<dyn PhysicalExpr>, op: &F) -> Result<Arc<dyn PhysicalExpr>>
+where
+    F: Fn(Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>>,
+{
+    transform_down(expr, op)
+}
+
+/// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the PhysicalExpr and all of its

Review Comment:
   I wonder what  you would think about following the same visitor pattern as rewriting `Expr`s, the `ExprRewriter`. 
   https://github.com/apache/arrow-datafusion/blob/488b2cec3c700821dfdfece2d85c4cd7956e718d/datafusion/expr/src/expr_rewriter.rs#L46-L56 
   
   This code is straightforward and well commented it is simply a different structure than the Logical exprs and uses different terms (like `down` and `up` rather than `pre` and `post`)?



##########
datafusion/physical-expr/src/expressions/in_list.rs:
##########
@@ -899,6 +914,39 @@ impl PhysicalExpr for InListExpr {
             }
         }
     }
+
+    fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        let mut children = vec![];
+        children.push(self.expr.clone());
+        children.extend(self.list.clone());
+        children
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        in_list(
+            children[0].clone(),
+            children[1..].to_vec(),
+            &self.negated,
+            &self.input_schema,
+        )
+    }
+}
+
+impl PartialEq<dyn Any> for InListExpr {
+    fn eq(&self, other: &dyn Any) -> bool {

Review Comment:
   This is clever -- it might be worth a comment here explaining that an InListExpr is considered equal if all the elements in the argument list are the same, regardless of order



##########
datafusion/core/src/physical_optimizer/utils.rs:
##########
@@ -45,3 +45,74 @@ pub fn optimize_children(
         with_new_children_if_necessary(plan, children)
     }
 }
+
+/// Apply transform `F` to the plan's children, the transform `F` might have a direction(Preorder or Postorder)
+fn map_children<F>(

Review Comment:
   How is this code different than what is in datafusion/physical-expr/src/utils.rs ?
   



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Collection of utility functions for Physical Expr optimization
+
+use crate::PhysicalExpr;
+use datafusion_common::DataFusionError;
+use std::result;
+use std::sync::Arc;
+
+pub type Result<T> = result::Result<T, DataFusionError>;
+
+/// Apply transform `F` to the PhysicalExpr's children, the transform `F` might have a direction(Preorder or Postorder)
+fn map_children<F>(
+    expr: Arc<dyn PhysicalExpr>,
+    transform: F,
+) -> Result<Arc<dyn PhysicalExpr>>
+where
+    F: Fn(Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
+{
+    if !expr.children().is_empty() {
+        let new_children: Result<Vec<_>> =
+            expr.children().into_iter().map(transform).collect();
+        with_new_children_if_necessary(expr, new_children?)
+    } else {
+        Ok(expr)
+    }
+}
+
+/// Convenience utils for writing optimizers rule: recursively apply the given `op` to the PhysicalExpr tree.
+/// When `op` does not apply to a given expr, it is left unchanged.
+/// The default tree traversal direction is transform_down(Preorder Traversal).
+pub fn transform<F>(expr: Arc<dyn PhysicalExpr>, op: &F) -> Result<Arc<dyn PhysicalExpr>>
+where
+    F: Fn(Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>>,
+{
+    transform_down(expr, op)
+}
+
+/// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the PhysicalExpr and all of its
+/// children(Preorder Traversal). When the `op` does not apply to a given PhysicalExpr, it is left unchanged.
+pub fn transform_down<F>(
+    expr: Arc<dyn PhysicalExpr>,
+    op: &F,
+) -> Result<Arc<dyn PhysicalExpr>>
+where
+    F: Fn(Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>>,
+{
+    let expr_cloned = expr.clone();
+    let after_op = match op(expr_cloned) {
+        Some(value) => value,
+        None => expr,
+    };
+    map_children(after_op.clone(), |expr: Arc<dyn PhysicalExpr>| {
+        transform_down(expr, op)
+    })
+}
+
+/// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its
+/// children and then itself(Postorder Traversal). When the `op` does not apply to a given PhysicalExpr, it is left unchanged.
+#[allow(dead_code)]
+pub fn transform_up<F>(
+    expr: Arc<dyn PhysicalExpr>,
+    op: &F,
+) -> Result<Arc<dyn PhysicalExpr>>
+where
+    F: Fn(Arc<dyn PhysicalExpr>) -> Option<Arc<dyn PhysicalExpr>>,
+{
+    let after_op_children =
+        map_children(expr, |expr: Arc<dyn PhysicalExpr>| transform_up(expr, op))?;
+
+    let after_op_children_clone = after_op_children.clone();
+    let new_expr = match op(after_op_children) {
+        Some(value) => value,
+        None => after_op_children_clone,
+    };
+    Ok(new_expr)
+}
+
+/// Returns a copy of this expr if we change any child according to the pointer comparison.
+/// The size of `children` must be equal to the size of `PhysicalExpr::children()`.
+/// Allow the vtable address comparisons for PhysicalExpr Trait Objects,it is harmless even
+/// in the case of 'false-native'.
+#[allow(clippy::vtable_address_comparisons)]
+pub fn with_new_children_if_necessary(
+    expr: Arc<dyn PhysicalExpr>,
+    children: Vec<Arc<dyn PhysicalExpr>>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    if children.len() != expr.children().len() {
+        Err(DataFusionError::Internal(
+            "PhysicalExpr: Wrong number of children".to_string(),
+        ))
+    } else if children.is_empty()
+        || children
+            .iter()
+            .zip(expr.children().iter())
+            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
+    {
+        expr.with_new_children(children)
+    } else {
+        Ok(expr)
+    }
+}

Review Comment:
   Some unit tests (or doctests) for these functions would be good



##########
datafusion/core/src/physical_optimizer/enforcement.rs:
##########
@@ -0,0 +1,858 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
+//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
+//!
+use crate::error::Result;
+use crate::physical_optimizer::utils::transform_up;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
+use crate::prelude::SessionConfig;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::{
+    normalize_sort_expr_with_equivalence_properties, PhysicalSortExpr,
+};
+use std::sync::Arc;
+
+/// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met
+/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree
+/// and give a non-optimal plan, but it can avoid the possible data skew in joins
+///
+/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
+/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
+///
+/// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c).
+#[derive(Default)]
+pub struct BasicEnforcement {}
+
+impl BasicEnforcement {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for BasicEnforcement {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &SessionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Distribution and Ordering enforcement need to be applied bottom-up.
+        let target_partitions = config.target_partitions;
+        transform_up(plan, &{
+            |plan| Some(ensure_distribution_and_ordering(plan, target_partitions))
+        })
+    }
+
+    fn name(&self) -> &str {
+        "BasicEnforcement"
+    }
+}
+
+fn ensure_distribution_and_ordering(
+    plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
+    target_partitions: usize,
+) -> Arc<dyn crate::physical_plan::ExecutionPlan> {
+    if plan.children().is_empty() {
+        return plan;
+    }
+    let required_input_distributions = plan.required_input_distribution();
+    let required_input_orderings = plan.required_input_ordering();
+    let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
+    assert_eq!(children.len(), required_input_distributions.len());
+    assert_eq!(children.len(), required_input_orderings.len());
+
+    // Add RepartitionExec to guarantee output partitioning
+    let children = children
+        .into_iter()
+        .zip(required_input_distributions.into_iter())
+        .map(|(child, required)| {
+            if child
+                .output_partitioning()
+                .satisfy(required.clone(), || child.equivalence_properties())
+            {
+                child
+            } else {
+                let new_child: Arc<dyn ExecutionPlan> = match required {
+                    Distribution::SinglePartition
+                        if child.output_partitioning().partition_count() > 1 =>
+                    {
+                        Arc::new(CoalescePartitionsExec::new(child.clone()))
+                    }
+                    _ => {
+                        let partition = required.create_partitioning(target_partitions);
+                        Arc::new(RepartitionExec::try_new(child, partition).unwrap())
+                    }
+                };
+                new_child
+            }
+        });
+
+    // Add SortExec to guarantee output ordering
+    let new_children: Vec<Arc<dyn ExecutionPlan>> = children
+        .zip(required_input_orderings.into_iter())
+        .map(|(child, required)| {
+            if ordering_satisfy(child.output_ordering(), required, || {
+                child.equivalence_properties()
+            }) {
+                child
+            } else {
+                let sort_expr = required.unwrap().to_vec();
+                if child.output_partitioning().partition_count() > 1 {
+                    Arc::new(SortExec::new_with_partitioning(
+                        sort_expr, child, true, None,
+                    ))
+                } else {
+                    Arc::new(SortExec::try_new(sort_expr, child, None).unwrap())
+                }
+            }
+        })
+        .collect::<Vec<_>>();
+
+    with_new_children_if_necessary(plan, new_children).unwrap()
+}
+
+/// DynamicEnforcement rule
+///
+///
+#[derive(Default)]
+pub struct DynamicEnforcement {}
+
+// TODO

Review Comment:
   What is this supposed to do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org