You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/26 13:18:29 UTC

[GitHub] [arrow-datafusion] alamb opened a new issue #92: [Rust] Physical plan refactor to support optimization rules and more efficient use of threads

alamb opened a new issue #92:
URL: https://github.com/apache/arrow-datafusion/issues/92


   *Note*: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-9464
   
   I would like to propose a refactor of the physical/execution planning based on the experience I have had in implementing distributed execution in Ballista.
   
   This will likely need subtasks but here is an overview of the changes I am proposing.
   h3. *Introduce physical plan optimization rule to insert "shuffle" operators*
   
   We should extend the ExecutionPlan trait so that each operator can specify its input and output partitioning needs, and then have an optimization rule that can insert any repartitioning or reordering steps required.
   
   For example, these are the methods to be added to ExecutionPlan. This design is based on Apache Spark.
   
    
   {code:java}
   /// Specifies how data is partitioned across different nodes in the cluster
   fn output_partitioning(&self) -> Partitioning {
       Partitioning::UnknownPartitioning(0)
   }
   
   /// Specifies the data distribution requirements of all the children for this operator
   fn required_child_distribution(&self) -> Distribution {
       Distribution::UnspecifiedDistribution
   }
   
   /// Specifies how data is ordered in each partition
   fn output_ordering(&self) -> Option<Vec<SortOrder>> {
       None
   }
   
   /// Specifies the data distribution requirements of all the children for this operator
   fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> {
       None
   }
    {code}
   A good example of applying this rule would be in the case of hash aggregates where we perform a partial aggregate in parallel across partitions and then coalesce the results and apply a final hash aggregate.
   
   Another example would be a SortMergeExec specifying the sort order required for its children.
   
    
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on issue #92: [Rust] Physical plan refactor to support optimization rules and more efficient use of threads

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #92:
URL: https://github.com/apache/arrow-datafusion/issues/92#issuecomment-826828467


   Comment from Andy Grove(andygrove) @ 2020-08-12T15:19:33.316+0000:
   <pre>Based on recent experience testing query execution with async, I no longer feel that async makes sense for DataFusion. Async is good for network io but not for file io. It is better to have a single thread per partition when executing queries.
   
   Also, we can't use async with Parquet currently without launching a dedicated thread per partition which pretty much defeats the point of using async in the first place.
   
   I believe that we do need the concept of executors and a scheduler in DataFusion, where each executor would run on a dedicated thread. Other projects would then be able to extend this for distributed execution for example.</pre>
   
   Comment from Adam Lippai(alippai) @ 2020-08-12T15:33:01.481+0000:
   <pre>I think using sync file io is a good compromise, Arrow or Datafusion doesn't perform low-latency or highly concurrent file io, at least not yet.  
   
   Does "It is better to have a single thread per partition when executing queries." contradict "we do need the concept of executors and a scheduler in DataFusion"?
   What do you think about my initial concern regarding the number of max threads?
   Does limiting the concurrency or using a threadpool make sense?
   
   If I have a partitioned dataset (let's say 1000 or 10k files), each with 1000 columns I should be able to read and process it without spawning this amount of threads _at once_.</pre>
   
   Comment from Andy Grove(andygrove) @ 2020-08-12T15:58:30.506+0000:
   <pre>I did a slightly better job of explaining this in https://issues.apache.org/jira/browse/ARROW-9707
   
   "The current threading model is very simple and does not scale. We currently use 1-2 dedicated threads per partition and they all run simultaneously, which is a huge problem if you have more partitions than logical or physical cores.
   This task is to re-implement the threading model so that query execution uses a fixed (configurable) number of threads. Work will be broken down into stages and tasks and each in-process executor (running on a dedicated thread) will process its queue of tasks.
   
   This process will be driven by a scheduler."</pre>
   
   Comment from Andy Grove(andygrove) @ 2020-08-23T15:06:44.701+0000:
   <pre>Issue resolved by pull request 8029
   [https://github.com/apache/arrow/pull/8029]</pre>
   
   Comment from Andy Grove(andygrove) @ 2020-08-26T02:14:08.199+0000:
   <pre>Issue resolved by pull request 8034
   [https://github.com/apache/arrow/pull/8034]</pre>
   
   Comment from Andy Grove(andygrove) @ 2020-10-03T18:25:46.419+0000:
   <pre>There are subtasks that are not complete yet. Reopening this for 3.0.0</pre>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] rdettai commented on issue #92: Physical plan refactor to support optimization rules and more efficient use of threads

Posted by GitBox <gi...@apache.org>.
rdettai commented on issue #92:
URL: https://github.com/apache/arrow-datafusion/issues/92#issuecomment-954584058


   This issue seems half addressed, half opened:
   - [x] required_child_distribution
   - [x] output_partitioning
   - [ ] output_ordering
   - [ ] required_child_ordering


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org