You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "devinjdangelo (via GitHub)" <gi...@apache.org> on 2023/07/24 22:08:30 UTC

[GitHub] [arrow-datafusion] devinjdangelo opened a new issue, #7079: [DataFrame] Parallel Write out of dataframe

devinjdangelo opened a new issue, #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079

   ### Is your feature request related to a problem or challenge?
   
   Related to #6983. I noticed the same performance issue when writing a single large partition/file form a DataFrame. Only a single core is used and it can take quite a long time. When there are a small number of large partitions being written it would be ideal to leverage multiple cores, especially now that we are leveraging multipart  ObjectStore uploads for writes #6987.
   
   
   ### Describe the solution you'd like
   
   This part of the write methods needs to process the RecordBatch stream in parallel (perhaps with try_for_each_concurrent):
   
   ```rust
   while let Some(next_batch) = stream.next().await {
                   let batch = next_batch?;
                   writer.write(&batch).await?;
               }
   ```
   
   This could be nontrivial for stateful writers like `AsyncArrowWriter`. It also isn't clear to me immediately how the multipart context could be shared with concurrent access across threads.
   
   ### Describe alternatives you've considered
   
   You can repartition your DataFrame to more partitions and write out smaller files, but sometimes you really do want large files to be written.
   
   ### Additional context
   
   To reproduce this (adapted from @alamb's example in #6983) :
   
   ```
   cd datafusion/benchmarks
   ./bench.sh data tpch10
   ```
   
   ```rust
   use std::{io::Error, time::Instant, sync::Arc};
   use datafusion::prelude::*;
   use chrono;
   use datafusion_common::DataFusionError;
   use object_store::local::LocalFileSystem;
   use url::Url;
   
   const FILENAME: &str = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
   
   #[tokio::main]
   async fn main() -> Result<(), DataFusionError> {
       let _ctx = SessionContext::new();
       let local = Arc::new(LocalFileSystem::new());
       let local_url = Url::parse("file://local").unwrap();
       _ctx.runtime_env().register_object_store(&local_url, local);
   
       let _read_options = ParquetReadOptions { file_extension: ".parquet", table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };
       let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap();
   
       let start = Instant::now();
       println!("datafusion start -> {:?}", chrono::offset::Local::now());
   
       _df.write_parquet("file://local/home/dev/arrow-datafusion/test_out/", None).await?;
       let elapsed = Instant::now() - start;
       println!("datafusion end -> {:?} {elapsed:?}", chrono::offset::Local::now());
       Ok(())
   }
   ```
   
   This took 379s on my machine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed issue #7079: [DataFrame] Parallel Write out of dataframe

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed issue #7079: [DataFrame] Parallel Write out of dataframe 
URL: https://github.com/apache/arrow-datafusion/issues/7079


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #7079: [DataFrame] Parallel Write out of dataframe

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079#issuecomment-1653588068

   That looks awesome @devinjdangelo 
   
   I agree there is significant room for improvement when loading data in parallel. Can't wait to see it get implemented ❤️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #7079: [DataFrame] Parallel Write out of dataframe

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079#issuecomment-1652250526

   Thank you for this writeup @devinjdangelo 
   
   I think the best architectural decision would be to move DataFrame to use the same (partially implemented) code that handles writing into files (e.g. for `INSERT INTO ...`) -- https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.DmlStatement.html
   
    We could then handle all the details of multi part writes, parallelization, etc there. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] devinjdangelo commented on issue #7079: [DataFrame] Parallel Write out of dataframe

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on issue #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079#issuecomment-1652538831

   Thanks @alamb for the pointer. I will take a look at that other write code! 
   
   I was able to develop a POC implementation on the existing write_json method to prove out the potential for a significant speed up. I changed the above test code to separate out the read/write timings:
   
   ```rust
   use std::{io::Error, time::Instant, sync::Arc};
   use datafusion::prelude::*;
   use chrono;
   use datafusion_common::DataFusionError;
   use object_store::local::LocalFileSystem;
   use url::Url;
   
   const FILENAME: &str = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
   
   #[tokio::main]
   async fn main() -> Result<(), DataFusionError> {
       let _ctx = SessionContext::new();
       let local = Arc::new(LocalFileSystem::new());
       let local_url = Url::parse("file://local").unwrap();
       _ctx.runtime_env().register_object_store(&local_url, local);
   
       let _read_options = ParquetReadOptions { file_extension: ".parquet", table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };
   
       let start = Instant::now();
       let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap()
           //select a few columns with types compatible with write_json method
           .select_columns(&["l_orderkey", "l_partkey", "l_receiptdate"])?.cache().await?;
       let elapsed = Instant::now() - start;
       println!("read parquet to memory took -> {elapsed:?}");
   
       let start2 = Instant::now();
       _df.write_json("file://local/home/dev/arrow-datafusion/test_out/").await?;
       let elapsed2 = Instant::now() - start2;
       println!("write as json to disk took -> {elapsed2:?}");
       Ok(())
   }
   ```
   
   As a baseline, the current `write_json` on main results in the following timings:
   
   read parquet to memory took -> 10.985273516s
   write as json to disk took -> 191.64463431s
   
   I modified the plan_to_json method as follows:
   
   ```rust
   pub async fn plan_to_json(
       task_ctx: Arc<TaskContext>,
       plan: Arc<dyn ExecutionPlan>,
       path: impl AsRef<str>,
   ) -> Result<()> {
       let path = path.as_ref();
       let parsed = ListingTableUrl::parse(path)?;
       let object_store_url = parsed.object_store();
       let store = task_ctx.runtime_env().object_store(&object_store_url)?;
       let mut join_set = JoinSet::new();
       for i in 0..plan.output_partitioning().partition_count() {
           let storeref = store.clone();
           let plan: Arc<dyn ExecutionPlan> = plan.clone();
           let filename = format!("{}/part-{i}.json", parsed.prefix());
           let file = object_store::path::Path::parse(filename)?;
   
           let mut stream = plan.execute(i, task_ctx.clone())?;
           join_set.spawn(async move {
               let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
               
               let mut inner_join_set = JoinSet::new();
               while let Some(batch) = stream.try_next().await?{
                   inner_join_set.spawn(async move {
                       let buffer = Vec::with_capacity(1024);
                       let mut writer = json::LineDelimitedWriter::new(buffer);
                       writer.write(&batch)?;
                       let r: Result<Vec<u8>, DataFusionError> = Ok(writer.into_inner());
                       r
                   });
               }
   
               while let Some(result) = inner_join_set.join_next().await{
                   match result {
                       Ok(r) => {
                           let batch = r?;
                           multipart_writer.write_all(&batch).await?;
                       },
                       Err(e) => {
                           if e.is_panic() {
                           std::panic::resume_unwind(e.into_panic());
                               } else {
                                   unreachable!();
                               }
                           }
                   }
               }
   
               multipart_writer
                   .shutdown()
                   .await
                   .map_err(DataFusionError::from)
           });
       }
   
       while let Some(result) = join_set.join_next().await {
           match result {
               Ok(res) => res?, // propagate DataFusion error
               Err(e) => {
                   if e.is_panic() {
                       std::panic::resume_unwind(e.into_panic());
                   } else {
                       unreachable!();
                   }
               }
           }
       }
   
       Ok(())
   }
   ```
   
   Now the timings are:
   
   read parquet to memory took -> 10.990137608s
   write as json to disk took -> 18.332761845s


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org