You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/25 11:57:16 UTC

[GitHub] [arrow-datafusion] yjshen commented on issue #2079: RFC: More Granular File Operators

yjshen commented on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1078958940


   Thanks for bringing it up. 
   
   Recently, we've encountered several different circumstances to deal with query execution parallelism: 
   - To parallelize the parquet file scan and reduce filesystem operation overhead; (The current parquet chunkReader API).
   - To incorporate task concept, which is widely available in distributed computation frameworks such as Spark, presto, into the DataFusion core. https://github.com/apache/arrow-datafusion/pull/1987
   - Improved Scheduler to limit concurrently (i.e., don't scan 20 parquet files in parallel if you have only two cores) [From Influx](https://github.com/influxdata/influxdb_iox/issues/3994).
   - Together with several previous issues such as #64 #924
   
   I think it might be the right time to rethink how to divide query working sets, how to partition/repartition data among operators, and how we should schedule tasks.
   
   My opinion on this whole scheduler and execution framework is: (proposed previously in https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825384514)
   - Adopt stage-based execution in DataFusion core.
       - Divide query execution plan based on "exchange" operators or "pipeline breakers" into a DAG of stages.
       - For each stage, group all operators into a `Task`. Processing data with operator logic serially, synced, pipelined inside each Task.
       - For the root stages that read data from files directly, partition input dataset based on a per-task size configuration. (similar to that of `input.split.maxSize` for MapReduce and similars for Spark/Presto). 
       - For non-root stages, we could either adopt a fixed `num_partition` or determine the number of partitions based on runtime generated size.
   
   - A shared scheduler framework for both DataFusion and Ballista.
     -  Schedule tasks based on stages dependency and schedule tasks based on available cores. 
   
   - Ultimately, I'm expecting a finer-grained execution for DataFusion core, as described in [Morsel-driven parallelism](https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf) and [Push-pull](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf).
   
   By the method proposed above, we could also achieve:
    >   limit concurrently (don't scan 20 parquet files in parallel if you have only two cores)
   
   with the help of the TaskScheduler, and achieve:
   
     > can instead construct a more complex physical plan containing the necessary ProjectionExec, SchemaAdapter
   
   by an existing, sequentially executed, Task.
   
   cc @houqp @andygrove @liukun4515 @yahoNanJing @mingmwang. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org