You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/11 12:52:09 UTC

[GitHub] [arrow-datafusion] tustvold opened a new issue, #2199: Morsel-Driven Parallelism Using Rayon

tustvold opened a new issue, #2199:
URL: https://github.com/apache/arrow-datafusion/issues/2199

   A proposal for reformulating the parallelism story within DataFusion to use a [morsel-driven](https://db.in.tum.de/~leis/papers/morsels.pdf) approach based on [rayon](https://docs.rs/rayon/latest/rayon/). More details, background, and discussion can be found in the proposal document [here](https://docs.google.com/document/d/1txX60thXn1tQO1ENNT8rwfU3cXLofa7ZccnvP4jD6AA), please feel free to comment there.
   
   The keys highlights are:
   
   * Decouples parallelism from the partitioning expressed in the physical plan, allowing for:
     * Better handling of imbalanced partitions
     * Adaptive parallelism based on compute availability at execution time
     * Parallelism within a partition, such as decoding parquet columns in parallel, [parallel sort](https://docs.rs/rayon/latest/rayon/slice/trait.ParallelSliceMut.html), etc...
   * The first step to reducing the complexity associated with the current futures-based concurrency model
   * Improvements to thread-locality, observability and performance
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed issue #2199: Morsel-Driven Parallelism Using Rayon

Posted by GitBox <gi...@apache.org>.
alamb closed issue #2199: Morsel-Driven Parallelism Using Rayon
URL: https://github.com/apache/arrow-datafusion/issues/2199


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] JasonLi-cn commented on issue #2199: Morsel-Driven Parallelism Using Rayon

Posted by GitBox <gi...@apache.org>.
JasonLi-cn commented on issue #2199:
URL: https://github.com/apache/arrow-datafusion/issues/2199#issuecomment-1263228061

   1. binary code
   
   ```rust
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::arrow::util::pretty::print_batches;
   use datafusion::error::Result;
   use datafusion::prelude::*;
   use datafusion::scheduler::Scheduler;
   use futures::{StreamExt, TryStreamExt};
   use std::env;
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let name = "test_table";
       let mut args = env::args();
       args.next();
       let table_path = args.next().expect("parquet file");
       let sql = &args.next().expect("sql");
       let using_scheduler = args.next().is_some();
   
       // create local session context
       let config = SessionConfig::new()
           .with_information_schema(true)
           .with_target_partitions(4);
       let context = SessionContext::with_config(config);
   
       // register parquet file with the execution context
       context
           .register_parquet(name, &table_path, ParquetReadOptions::default())
           .await?;
   
       let task = context.task_ctx();
       let query = context.sql(sql).await.unwrap();
       let plan = query.create_physical_plan().await.unwrap();
   
       println!("Start query, using scheduler {}", using_scheduler);
       let now = std::time::Instant::now();
       let results = if using_scheduler {
           let scheduler = Scheduler::new(4);
           let stream = scheduler.schedule(plan, task).unwrap().stream();
           let results: Vec<RecordBatch> = stream.try_collect().await.unwrap();
           results
       } else {
           context.sql(sql).await?.collect().await?
       };
       let elapsed = now.elapsed().as_millis();
       println!("End query, elapsed {} ms", elapsed);
       print_batches(&results)?;
       Ok(())
   }
   
   /// Execute sql
   async fn plan_and_collect(
       context: &SessionContext,
       sql: &str,
   ) -> Result<Vec<RecordBatch>> {
       context.sql(sql).await?.collect().await
   }
   ```
   
   2. test data
   
   - format: parquet
   - number of files: 4
   - rows: 16405852 * 4 = 65623408
   - number of columns: 6
   - schema: uint32, uint32, uint32, uint32, string, uint32
   
   3. test result
   
   SQLs:
   ```sql
   select count(distinct column0) from test_table;
   select * from test_table order by column5 limit 10;
   ```
   The performance is similar with and without the Scheduler! Is there a problem with where I use it?
   
   @tustvold 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on issue #2199: Morsel-Driven Parallelism Using Rayon

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2199:
URL: https://github.com/apache/arrow-datafusion/issues/2199#issuecomment-1263282629

   Yes that is expected, I've had to park working on this for a bit in favour of some other things. See #2504 for the follow on work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] JasonLi-cn commented on issue #2199: Morsel-Driven Parallelism Using Rayon

Posted by GitBox <gi...@apache.org>.
JasonLi-cn commented on issue #2199:
URL: https://github.com/apache/arrow-datafusion/issues/2199#issuecomment-1263332649

   > Yes that is expected, I've had to park working on this for a bit in favour of some other things. See #2504 for the follow on work
   
   Ok, thanks!
   By the way, do you know the difference between [ClickHouse's Query Execution Pipeline](https://presentations.clickhouse.com/meetup24/5.%20Clickhouse%20query%20execution%20pipeline%20changes/#clickhouse-query-execution-pipeline) and Datafusion's Execution model(likes vectorized volcano model)? And what are the advantages of ClickHouse?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org