You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/25 10:39:16 UTC

[GitHub] [arrow-datafusion] tustvold opened a new issue #424: Operator Ordering

tustvold opened a new issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424


   As identified in #423 and #378 (and #412) there needs to be a mechanism for physical operators to express their behaviour with respect to sort order, so that optimisation passes can handle it correctly. 
   
   It is assumed that the initial physical plan created from the logical plan is valid, and that the requirement is for the optimisation phase to not alter the plan in a way that violates its implicit ordering requirements. I think it is therefore sufficient to encode some notion of sort sensitivity, as opposed to what the sort order necessarily is. I believe any optimisations related to the specific sort orders being utilised would take place at the LogicalPlan level, and avoiding this at the physical layer sidesteps issues around equality for PhysicalExpressions, etc...
   
   The proposal would be to introduce a new member function to `ExecutionPlan` called `partition_order()` that returns a variant of a new enum `PartitionOrder`. This would have three variants:
   
   * `PartitionOrder::Preserving` - operations that preserve the ordering of their input partition(s) - e.g. `FilterExec`, `CoalesceBatchesExec`
   * `PartitionOrder::Sensitive` - operations which rely on the order of their input partition(s) - e.g. `GlobalLimitExec`, `LocalLimitExec` 
   * `PartitionOrder::Agnostic` - operations which do not rely on, nor preserve the order of their input partition(s) - e.g. `HashAggregateExec`, `MergeExec`, `RepartitionExec`
   
   Note that the formulation does not distinguish between 1 or many partitions, as this is a detail already encapsulated by `required_child_distribution` (although I do wonder if this should be a property of the plan and not the operators themselves). There is no mechanism to express an ordering requirement across partitions, I'm not sure that this would be useful.
   
   The default implementation of `partition_order()` would return `PartitionOrder::Sensitive`. Or to put it another way, unless explicitly told otherwise the optimiser cannot assume that an operator isn't sensitive to the ordering of its inputs.
   
   The `Repartition` pass would then be modified to only insert a `RepartitionExec` on branches of the graph that have no `PartitionOrder::Sensitive` operations without an intervening `PartitionOrder::Agnostic` operator. This would fix #423. `AddMergeExec` could additionally be modified to error if it find itself needing to insert a `MergeExec` on an order sensitive branch.
   
   Eventually as certain forms of `RepartitionExec` are order preserving, e.g. splitting a single partition into multiple, this could be codified and combined with a modified version of `AddMergeExec` that inserts an order preserving merge. This would naturally fit into the proposed framework.
   
   I'm not sure how ordering is typically handled in query engines, so if there is a standard solution I'd be happy to go with that instead, but thought I'd write up the simplest solution I can see to the issue in #423  
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on issue #424: Design how to respect output stream ordering

Posted by GitBox <gi...@apache.org>.
andygrove commented on issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424#issuecomment-848018100


   I agree that doing this at the logical plan level is simpler. My concern is that later on, if/when we make Ballista viable, we may want to dynamically optimize the query while it is executing, based on statistics from completed stages, and I was hoping to avoid Spark's approach of going back to the logical plan for this, and just optimize the physical plan. However, we can always revisit this later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb closed issue #424: Design how to respect output stream ordering

Posted by GitBox <gi...@apache.org>.
alamb closed issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on issue #424: Operator Ordering

Posted by GitBox <gi...@apache.org>.
andygrove commented on issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424#issuecomment-847857451


   We may want to consider following Spark's approach here, since we are already implementing other methods such as `required_child_distribution` based on Spark.
   
   For sorting, Spark has:
   
   ```scala
   /** Specifies sort order for each partition requirements on the input data for this operator. */
   def requiredChildOrdering: Seq[Seq[SortOrder]]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove edited a comment on issue #424: Operator Ordering

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424#issuecomment-847857451


   We may want to consider following Spark's approach here, since we are already implementing other methods such as `required_child_distribution` based on Spark.
   
   For sorting, Spark has:
   
   ```scala
   /** Specifies how data is ordered in each partition. */
   def outputOrdering: Seq[SortOrder]
   
   /** Specifies sort order for each partition requirements on the input data for this operator. */
   def requiredChildOrdering: Seq[Seq[SortOrder]]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on issue #424: Design how to respect output stream ordering

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424#issuecomment-1031940845


   Proposed fix: https://github.com/apache/arrow-datafusion/pull/1776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on issue #424: Design how to respect output stream ordering

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424#issuecomment-847981350


   My thoughts:
   
   I think it will be simpler, as @tustvold  has suggested, to do the majority / all of sort based optimizations (e.g. optimize away a Sort) on the `LogicalPlan` level, rather than in the physical plan. That way:
   1. We can work with `Exprs` rather than `PhysicalExpr`s. 
   2. The knowledge of sort order can also feed into potential cost model decisions too (e.g. join ordering, algorithm selection)
   
   Encoding the requirements / assumptions of `LogicalPlan` nodes via `outputOrdering ` or `requiredChildOrdering` seems like a good idea to me.
   
   In terms of physical plans, what about adding something like `ExecutionPlan::requires_output_sort()` that would communicate to the various physical optimizer passes when they had to preserve the output sort (and thus might preclude things like "repartition exec" from rewriting the plan)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove edited a comment on issue #424: Operator Ordering

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424#issuecomment-847857451


   We may want to consider following Spark's approach here, since we are already implementing other methods such as `required_child_distribution` based on Spark.
   
   For sorting, Spark has:
   
   ```scala
   /** Specifies how data is ordered in each partition. */
   def outputOrdering: Seq[SortOrder] = Nil
   
   /** Specifies sort order for each partition requirements on the input data for this operator. */
   def requiredChildOrdering: Seq[Seq[SortOrder]]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on issue #424: Design how to respect output stream ordering

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #424:
URL: https://github.com/apache/arrow-datafusion/issues/424#issuecomment-848053786


   > if/when we make Ballista viable, we may want to dynamically optimize the query while it is executing,
   
   I do think dynamically changing the plan (or algorithms) based on actual execution experience is currently state of the art. I have often wondered if it is better done within the operators themselves (like maybe a join deciding to switch to sort-merge-join when it filled up its hash tables, or sampling both inputs to decide which was smaller. etc)
   
   I would have to think of the kinds of dynamic plan changes we might want to do 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org