You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/24 18:16:35 UTC

[GitHub] [arrow-datafusion] tustvold opened a new issue #2079: RFC: More Granular File Operators

tustvold opened a new issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079


   **Is your feature request related to a problem or challenge? Please describe what you are trying to do.**
   
   Currently the file scan operators such as `ParquetExec`, `CsvExec`, etc... are created with a `FileScanConfig`, which internally contains a list of `PartitionedFile`. These `PartitionedFile` are provided grouped together in "file groups". For each of these groups, the operators expose a DataFusion partition which will scan these files sequentially.
   
   Whilst this works, I'm getting a little bit concerned we are ending up with quite a lot of complexity within each of the individual, file-format specific operators:
   
   * The individual files within a file group may have differing schema - #1669
   * If using Hive partitioning, need to project the additional rows from the partition key - #1139
   * Edge cases where the file isn't even needed - #1999
   * Intra-file parallelism - #1990
   
   This in turn comes with some downsides:
   
   * Code duplication between operators for different formats with potential for feature and functionality divergence
   * The operators are getting very large and quite hard to reason about
   * Execution details are hidden from the physical plan, potentially limiting parallelism, optimisation, introspection, etc...
   * Catalog details, such as the partitioning scheme, leak into the physical operators themselves
   
   **Describe the solution you'd like**
   
   It isn't a fully formed proposal, but I wonder if instead of continuing to extend the individual file format operators we might instead compose together simpler physical operators within the query plan. Specifically I wonder if we might make it so that the `ParquetExec`, `CsvExec` operators handle a single file, and the plan stage within `TableProvider::scan` instead constructs a more complex physical plan containing the necessary `ProjectionExec`, `SchemaAdapter` operators as necessary.
   
   For what it is worth, IOx uses a similar approach https://github.com/influxdata/influxdb_iox/blob/main/query/src/provider.rs#L282 and it works quite well.
   
   **Describe alternatives you've considered**
   
   The current approach could remain
   
   **Additional context**
   
   I'm trying to take a more holistic view on what the parquet interface upstream should look like, which is somewhat related to this https://github.com/apache/arrow-rs/issues/1473
   
   FYI @rdettai @yjshen @alamb 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
yjshen commented on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1083205518


   > Translation of a LogicalPlan::TableScan into a corresponding ExecutionPlan
   
   Sounds great. We could eliminate much of the common logic scattered in different file formats, as you mentioned. And yes, this makes sense!
   
   >  removing the ability to scan multiple files within a single operator, and instead composing multiple per-file operators in the generated ExecutionPlan.
   
   I'm confused here: How would we parallelize the physical operators after this `TableScanOp`? And how do we control this `single` operators parallelism? Will the data batch be sent through a multi-producer(the single ops)-multi-consumer(like filter/project/sort) queue?
   
   >  compose the SendableRecordBatchStream together. I think making this more explicit sounds like a good idea, but I'm not sure it is a simple case of grouping operators together, and would likely involve altering the contract of ExecutionPlan to be more explicit about what operators spawn additional parallel tasks. Perhaps this is what you're proposing?
   
   What do you think if we avoid `async` and `stream` from normal operators' `execute()`? just like you and @alamb mentioned merging project/filter logic into scan operator, these pure computations are async free in my opinion, they are data or computation-intensive and do not talk with IO systems. What do you think if we have:
   
   <img width="847" alt="image" src="https://user-images.githubusercontent.com/1387718/160857783-cd666871-8063-433a-92b3-98cc097ca117.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold edited a comment on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1079001425


   I agree entirely that the current scheduling within DataFusion, which effectively punts onto tokio and hopes it does something vaguely appropriate, is likely sub-optimal. In fact one of the issues I'm having with #1617 appears to be that sub-optimal task scheduling is causing a 2x performance regression.
   
   That being said, I think there are three problems here and it would be advantageous in my mind to keep them separate:
   
   * Translation of a `LogicalPlan::TableScan` into a corresponding `ExecutionPlan`
   * Optimisation of this `ExecutionPlan` to potentially introduce additional parallelism
   * Scheduling of this `ExecutionPlan` to actually perform its computation
   
   This ticket is then concerned solely with the first of these, and reducing the complexity of the file-format specific operators necessary to achieve this. I think the remaining two problems can and should be kept separate, in part because they have broader scope than just scanning files.
   
   _FWIW we sort of already do the stage-based execution you describe, but it is implicit based on whether operators call `tokio::spawn` or just compose the `SendableRecordBatchStream` together. I think making this more explicit sounds like a good idea, but I'm not sure it is a simple case of grouping operators together, and would likely involve altering the contract of `ExecutionPlan` to be more explicit about what operators spawn additional parallel tasks. Perhaps this is what you're proposing?_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold edited a comment on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1083214449


   > How would we parallelize the physical operators after this TableScanOp?
   
   You would have multiple single-partition `ParquetExec` being fed into a single `UnionExec`, potentially with some `ProjectionExec`, etc... in between. As each Datafusion partition gets its own tokio task => parallelism. If you wanted parallelism within a single file, you would have an optimizer pass that would replace the single `ParquetExec` with multiple with disjoint row groups, again this would be fed into a `UnionExec`.
   
   > What do you think if we avoid async and stream from normal operators' execute()?
   
   Let me get back to you on this, it is something I am currently mulling about and experimenting with. I agree that using async for CPU-bound work seems a little wonky, but as @alamb articulated [here](https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/) there are reasons that it may be the pragmatic choice. I'm trying to collect some data so we can make an informed decision :sweat_smile: 
   
   _FWIW as you link to the morsel driven paper - what you describe I think is closer to the more traditional plan-driven parallelism than morsel-driven parallelism. Tokio is much closer to that paper than what you describe as it incorporates notions of dynamic scheduling and work-stealing, rayon may be even closer_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1083205518


   > Translation of a LogicalPlan::TableScan into a corresponding ExecutionPlan
   
   Sounds great. We could eliminate much of the common logic scattered in different file formats, as you mentioned. And yes, this makes sense!
   
   >  removing the ability to scan multiple files within a single operator, and instead composing multiple per-file operators in the generated ExecutionPlan.
   
   I'm confused here: How would we parallelize the physical operators after this `TableScanOp`? And how do we control this `single` operators parallelism? Will the data batch be sent through a multi-producer(the single ops)-multi-consumer(like filter/project/sort) queue?
   
   >  compose the SendableRecordBatchStream together. I think making this more explicit sounds like a good idea, but I'm not sure it is a simple case of grouping operators together, and would likely involve altering the contract of ExecutionPlan to be more explicit about what operators spawn additional parallel tasks. Perhaps this is what you're proposing?
   
   What do you think if we avoid `async` and `stream` from normal operators' `execute()`? just like you and @alamb mentioned merging project/filter logic into scan operator, these pure computations are async free in my opinion, they are data or computation-intensive and do not talk with IO systems. What do you think if we have (similar to that of [Morsel-driven](https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf)): 
   
   <img width="847" alt="image" src="https://user-images.githubusercontent.com/1387718/160857783-cd666871-8063-433a-92b3-98cc097ca117.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold edited a comment on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1079001425


   I agree entirely that the current scheduling within DataFusion, which effectively punts onto tokio and hopes it does something vaguely appropriate, is likely sub-optimal. In fact one of the issues I'm having with #1617 appears to be that sub-optimal task scheduling is causing a 2x performance regression.
   
   That being said, I think there are three problems here and it would be advantageous in my mind to keep them separate:
   
   * Translation of a `LogicalPlan::TableScan` into a corresponding `ExecutionPlan`
   * Optimisation of this `ExecutionPlan` to potentially introduce additional parallelism
   * Scheduling of this `ExecutionPlan` to actually perform its computation
   
   This ticket is then concerned solely with the first of these, and reducing the complexity of the file-format specific operators necessary to achieve this. I think the remaining two problems can and should be kept separate, in part because they have broader scope than just scanning files.
   
   _FWIW we sort of already do the stage-based execution you describe, but it is implicit based on whether operators call `tokio::spawn` or just compose the `SendableRecordBatchStream` together. I think making this more explicit sounds like a good idea, but I'm not sure it is a simple case of grouping operators together, and would likely involve altering the contract of `ExecutionPlan` to be more explicit about what operators spawn additional parallel tasks. Perhaps this is what you're proposing?_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1083313672


   > Or do I miss something crucial that one SendableRecordBatchStream can be parallel processed by multiple tokio tasks
   
   Depends, the SendableRecordBatchStream itself can only be processed by a single tokio task correct, however, there is nothing to prevent that stream from actually being an mpsc channel with the actual work performed in other tasks in parallel. In fact this is exactly what CoalescePartitionsExec does, and the physical optimizer will add combinations of RepartitionExec and CoalescePartitionsExec to plans based on the target_partitions setting.
   
   Whilst target_partitions is typically set to the CPU thread count, RepartitionExec will typically appear multiple times in a given plan, and so this will result in more tasks than CPU cores. If there are other queries running concurrently, or the target_partitions is set higher, this will be even more pronounced. If you now squint, this is a first-order approximation of morsel-driven. It's far from perfect, the tokio scheduler is not in anyway NUMA-aware and in fact it optimises for load-distribution at the expense of thread-locality, but it is not hugely dissimilar.
   
   At least that's my hand-wavy argument :laughing: I happen to think rayon is closer in spirit, but I'm not sure how much of a difference that will make in practice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
yjshen commented on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1083283816


   > it is something I am currently mulling about and experimenting with. I agree that using async for CPU-bound work seems a little wonky, but as @alamb articulated [here](https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/) there are reasons that it may be the pragmatic choice. I'm trying to collect some data so we can make an informed decision 😅
   
   Very much looking forward to it.
   
   > you describe I think is closer to the more traditional plan-driven parallelism than morsel-driven parallelism. Tokio is much closer to that paper than what you describe as it incorporates notions of dynamic scheduling and work-stealing, rayon may be even closer
   
   I think work-stealing in Morsel-driven and that in Tokio are quite different things. Having a rough partition of the whole dataset at the beginning, and **stealing part of data** from the skewed partition to idle working slots or CPU cores later is quite different from **task/green thread stealing** for Tokio. Or do I miss something crucial that one SendableRecordBatchStream can be parallel processed by multiple tokio tasks? 🤔


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1083214449


   > How would we parallelize the physical operators after this TableScanOp?
   
   You would have multiple single-partition `ParquetExec` being fed into a single `UnionExec`, potentially with some `ProjectionExec`, etc... in between. As each Datafusion partition gets its own tokio task => parallelism. If you wanted parallelism within a single file, you would have an optimizer pass that would replace the single `ParquetExec` with multiple with disjoint row groups, again this would be fed into a `UnionExec`.
   
   > What do you think if we avoid async and stream from normal operators' execute()?
   
   Let me get back to you on this, it is something I am currently mulling about and experimenting with. I agree that using async for CPU-bound work seems a little wonky, but as @alamb articulated [here](https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/) there are reasons that it may be the pragmatic choice. I'm trying to collect some data so we can make an informed decision :sweat_smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold edited a comment on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1079001425


   I agree entirely that the current scheduling within DataFusion, which effectively punts onto tokio and hopes it does something vaguely appropriate, is likely sub-optimal. In fact one of the issues I'm having with #1617 appears to be that sub-optimal task scheduling is causing a 2x performance regression.
   
   That being said, I think there are a number of problems here and it would be advantageous in my mind to keep them separate:
   
   * Translation of a `LogicalPlan::TableScan` into a corresponding `ExecutionPlan`
   * Optimisation of this `ExecutionPlan` to potentially introduce additional parallelism
   * Scheduling of this `ExecutionPlan` to actually perform its computation
   
   This ticket is then concerned solely with the first of these, and reducing the complexity of the file-format specific operators necessary to achieve this translation. In particular removing the ability to scan multiple files within a single operator, and instead composing multiple per-file operators in the generated `ExecutionPlan`. I think the remaining two problems can and should be kept separate, in part because they have broader scope than just scanning files. 
   
   Does this make sense, or am I missing something?
   
   _FWIW we sort of already do the stage-based execution you describe, but it is implicit based on whether operators call `tokio::spawn` or just compose the `SendableRecordBatchStream` together. I think making this more explicit sounds like a good idea, but I'm not sure it is a simple case of grouping operators together, and would likely involve altering the contract of `ExecutionPlan` to be more explicit about what operators spawn additional parallel tasks. Perhaps this is what you're proposing?_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1079001425


   I agree entirely that the current scheduling within DataFusion, which effectively punts onto tokio and hopes it does something vaguely appropriate, is likely sub-optimal. In fact one of the issues I'm having with #1617 appears to be that sub-optimal task scheduling is causing a 2x performance regression.
   
   That being said, I think there are three problems here and it would be advantageous in my mind to keep them separate:
   
   * Translation of a `LogicalPlan::TableScan` into a corresponding `ExecutionPlan`
   * Optimisation of this `ExecutionPlan` to potentially introduce additional parallelism
   * Scheduling of this `ExecutionPlan` to actually perform its computation
   
   This ticket is then concerned solely with the first of these, and reducing the complexity of the file-format specific operators necessary to achieve this. I think the remaining two problems can and should be kept separate, in part because they have broader scope than just scanning files.
   
   FWIW we sort of already do the stage-based execution you describe, but it is implicit based on whether operators call `tokio::spawn` or just compose the `SendableRecordBatchStream` together. I think making this more explicit sounds like a good idea, but I'm not sure it is a simple case of grouping operators together, and would likely involve altering the contract of `ExecutionPlan` to be more explicit about what operators spawn additional parallel tasks. Perhaps this is what you're proposing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1078958940


   Thanks for bringing it up. 
   
   Recently, we've encountered several different circumstances to deal with query execution parallelism: 
   - To parallelize the parquet file scan and reduce filesystem operation overhead; (The current parquet chunkReader API).
   - To incorporate task concept, which is widely available in distributed computation frameworks such as Spark, presto, into the DataFusion core. https://github.com/apache/arrow-datafusion/pull/1987
   - Improved Scheduler to limit concurrently (i.e., don't scan 20 parquet files in parallel if you have only two cores) [From Influx](https://github.com/influxdata/influxdb_iox/issues/3994).
   - Together with several previous issues such as #64 #924
   
   I think it might be the right time to rethink how to divide query working sets, how to partition/repartition data among operators, and how we should schedule tasks.
   
   My opinion on this whole scheduler and execution framework is: (partly proposed in https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825384514 and in our roadmap)
   - Adopt stage-based execution in DataFusion core.
       - Divide query execution plan based on "exchange" operators or "pipeline breakers" into a DAG of stages.
       - For each stage, group all operators into a `Task`. Processing data with operator logic serially, synced, pipelined inside each Task.
       - For the root stages that read data from files directly, partition input dataset based on a per-task size configuration. (similar to that of `input.split.maxSize` for MapReduce and similars for Spark/Presto). 
       - For non-root stages, we could either adopt a fixed `num_partition` or determine the number of partitions based on runtime generated size.
   
   - A shared scheduler framework for both DataFusion and Ballista.
     -  Schedule tasks based on stages dependency and schedule tasks based on available cores. 
   
   - Ultimately, I'm expecting a finer-grained execution for DataFusion core, as described in [Morsel-driven parallelism](https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf) and [Push-pull](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf).
   
   By the method proposed above, we could also achieve:
    >   limit concurrently (don't scan 20 parquet files in parallel if you have only two cores)
   
   with the help of the TaskScheduler, and achieve:
   
     > can instead construct a more complex physical plan containing the necessary ProjectionExec, SchemaAdapter
   
   by an existing, sequentially executed, Task.
   
   cc @houqp @andygrove @liukun4515 @yahoNanJing @mingmwang. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1078958940


   Thanks for bringing it up. 
   
   Recently, we've encountered several different circumstances to deal with query execution parallelism: 
   - To parallelize the parquet file scan and reduce filesystem operation overhead; (The current parquet chunkReader API).
   - To incorporate task concept, which is widely available in distributed computation frameworks such as Spark, presto, into the DataFusion core. https://github.com/apache/arrow-datafusion/pull/1987
   - Improved Scheduler to limit concurrently (i.e., don't scan 20 parquet files in parallel if you have only two cores) [From Influx](https://github.com/influxdata/influxdb_iox/issues/3994).
   - Together with several previous issues such as #64 #924
   
   I think it might be the right time to rethink how to divide query working sets, how to partition/repartition data among operators, and how we should schedule tasks.
   
   My opinion on this whole scheduler and execution framework is: (partly proposed in https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825384514 and in our roadmap)
   - Adopt stage-based execution in DataFusion core.
       - Divide query execution plan based on "exchange" operators or "pipeline breakers" into a DAG of stages.
       - For each stage, group all operators into a `Task`. Processing data with operator logic serially, synced, pipelined inside each Task.
       - For the root stages that read data from files directly, partition input dataset based on a per-task size configuration. (similar to that of `input.split.maxSize` for MapReduce and similars for Spark/Presto). 
       - For non-root stages, we could either adopt a fixed `num_partition` or determine the number of partitions based on runtime generated size.
   
   - A shared scheduler framework for both DataFusion and Ballista.
     -  Schedule tasks based on stages dependency and schedule tasks based on available cores. 
   
   - Ultimately, a finer-grained execution for DataFusion core, as described in [Morsel-driven parallelism](https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf) and [Push-pull](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf).
   
   
   By the method proposed above, we could also achieve:
    >   limit concurrently (don't scan 20 parquet files in parallel if you have only two cores)
   
   with the help of the TaskScheduler, and achieve:
   
     > can instead construct a more complex physical plan containing the necessary ProjectionExec, SchemaAdapter
   
   by an existing, sequentially executed, Task.
   
   
   cc @houqp @andygrove @liukun4515 @yahoNanJing @mingmwang. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1077916193


   > Specifically I wonder if we might make it so that the ParquetExec, CsvExec operators handle a single file, and the plan stage within TableProvider::scan instead constructs a more complex physical plan containing the necessary ProjectionExec, SchemaAdapter operators as necessary.
   
   I like this idea (though I am somewhat biased, as I like / implemented a bunch of the IOx approach) :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on issue #2079: RFC: More Granular File Operators

Posted by GitBox <gi...@apache.org>.
yjshen commented on issue #2079:
URL: https://github.com/apache/arrow-datafusion/issues/2079#issuecomment-1078958940


   Thanks for bringing it up. 
   
   Recently, we've encountered several different circumstances to deal with query execution parallelism: 
   - To parallelize the parquet file scan and reduce filesystem operation overhead; (The current parquet chunkReader API).
   - To incorporate task concept, which is widely available in distributed computation frameworks such as Spark, presto, into the DataFusion core. https://github.com/apache/arrow-datafusion/pull/1987
   - Improved Scheduler to limit concurrently (i.e., don't scan 20 parquet files in parallel if you have only two cores) [From Influx](https://github.com/influxdata/influxdb_iox/issues/3994).
   - Together with several previous issues such as #64 #924
   
   I think it might be the right time to rethink how to divide query working sets, how to partition/repartition data among operators, and how we should schedule tasks.
   
   My opinion on this whole scheduler and execution framework is: (proposed previously in https://github.com/apache/arrow-datafusion/pull/1987#discussion_r825384514)
   - Adopt stage-based execution in DataFusion core.
       - Divide query execution plan based on "exchange" operators or "pipeline breakers" into a DAG of stages.
       - For each stage, group all operators into a `Task`. Processing data with operator logic serially, synced, pipelined inside each Task.
       - For the root stages that read data from files directly, partition input dataset based on a per-task size configuration. (similar to that of `input.split.maxSize` for MapReduce and similars for Spark/Presto). 
       - For non-root stages, we could either adopt a fixed `num_partition` or determine the number of partitions based on runtime generated size.
   
   - A shared scheduler framework for both DataFusion and Ballista.
     -  Schedule tasks based on stages dependency and schedule tasks based on available cores. 
   
   - Ultimately, I'm expecting a finer-grained execution for DataFusion core, as described in [Morsel-driven parallelism](https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf) and [Push-pull](https://www.cambridge.org/core/services/aop-cambridge-core/content/view/D67AE4899E87F4B5102F859B0FC02045/S0956796818000102a.pdf/div-class-title-push-versus-pull-based-loop-fusion-in-query-engines-div.pdf).
   
   By the method proposed above, we could also achieve:
    >   limit concurrently (don't scan 20 parquet files in parallel if you have only two cores)
   
   with the help of the TaskScheduler, and achieve:
   
     > can instead construct a more complex physical plan containing the necessary ProjectionExec, SchemaAdapter
   
   by an existing, sequentially executed, Task.
   
   cc @houqp @andygrove @liukun4515 @yahoNanJing @mingmwang. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org