You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "devinjdangelo (via GitHub)" <gi...@apache.org> on 2023/07/24 22:08:30 UTC
[GitHub] [arrow-datafusion] devinjdangelo opened a new issue, #7079: [DataFrame] Parallel Write out of dataframe
devinjdangelo opened a new issue, #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079
### Is your feature request related to a problem or challenge?
Related to #6983. I noticed the same performance issue when writing a single large partition/file form a DataFrame. Only a single core is used and it can take quite a long time. When there are a small number of large partitions being written it would be ideal to leverage multiple cores, especially now that we are leveraging multipart ObjectStore uploads for writes #6987.
### Describe the solution you'd like
This part of the write methods needs to process the RecordBatch stream in parallel (perhaps with try_for_each_concurrent):
```rust
while let Some(next_batch) = stream.next().await {
let batch = next_batch?;
writer.write(&batch).await?;
}
```
This could be nontrivial for stateful writers like `AsyncArrowWriter`. It also isn't clear to me immediately how the multipart context could be shared with concurrent access across threads.
### Describe alternatives you've considered
You can repartition your DataFrame to more partitions and write out smaller files, but sometimes you really do want large files to be written.
### Additional context
To reproduce this (adapted from @alamb's example in #6983) :
```
cd datafusion/benchmarks
./bench.sh data tpch10
```
```rust
use std::{io::Error, time::Instant, sync::Arc};
use datafusion::prelude::*;
use chrono;
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use url::Url;
const FILENAME: &str = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
let _ctx = SessionContext::new();
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://local").unwrap();
_ctx.runtime_env().register_object_store(&local_url, local);
let _read_options = ParquetReadOptions { file_extension: ".parquet", table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };
let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap();
let start = Instant::now();
println!("datafusion start -> {:?}", chrono::offset::Local::now());
_df.write_parquet("file://local/home/dev/arrow-datafusion/test_out/", None).await?;
let elapsed = Instant::now() - start;
println!("datafusion end -> {:?} {elapsed:?}", chrono::offset::Local::now());
Ok(())
}
```
This took 379s on my machine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] alamb closed issue #7079: [DataFrame] Parallel Write out of dataframe
Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed issue #7079: [DataFrame] Parallel Write out of dataframe
URL: https://github.com/apache/arrow-datafusion/issues/7079
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] alamb commented on issue #7079: [DataFrame] Parallel Write out of dataframe
Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079#issuecomment-1653588068
That looks awesome @devinjdangelo
I agree there is significant room for improvement when loading data in parallel. Can't wait to see it get implemented ❤️
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] alamb commented on issue #7079: [DataFrame] Parallel Write out of dataframe
Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079#issuecomment-1652250526
Thank you for this writeup @devinjdangelo
I think the best architectural decision would be to move DataFrame to use the same (partially implemented) code that handles writing into files (e.g. for `INSERT INTO ...`) -- https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.DmlStatement.html
We could then handle all the details of multi part writes, parallelization, etc there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] devinjdangelo commented on issue #7079: [DataFrame] Parallel Write out of dataframe
Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on issue #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079#issuecomment-1652538831
Thanks @alamb for the pointer. I will take a look at that other write code!
I was able to develop a POC implementation on the existing write_json method to prove out the potential for a significant speed up. I changed the above test code to separate out the read/write timings:
```rust
use std::{io::Error, time::Instant, sync::Arc};
use datafusion::prelude::*;
use chrono;
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use url::Url;
const FILENAME: &str = "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
let _ctx = SessionContext::new();
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://local").unwrap();
_ctx.runtime_env().register_object_store(&local_url, local);
let _read_options = ParquetReadOptions { file_extension: ".parquet", table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };
let start = Instant::now();
let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap()
//select a few columns with types compatible with write_json method
.select_columns(&["l_orderkey", "l_partkey", "l_receiptdate"])?.cache().await?;
let elapsed = Instant::now() - start;
println!("read parquet to memory took -> {elapsed:?}");
let start2 = Instant::now();
_df.write_json("file://local/home/dev/arrow-datafusion/test_out/").await?;
let elapsed2 = Instant::now() - start2;
println!("write as json to disk took -> {elapsed2:?}");
Ok(())
}
```
As a baseline, the current `write_json` on main results in the following timings:
read parquet to memory took -> 10.985273516s
write as json to disk took -> 191.64463431s
I modified the plan_to_json method as follows:
```rust
pub async fn plan_to_json(
task_ctx: Arc<TaskContext>,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = store.clone();
let plan: Arc<dyn ExecutionPlan> = plan.clone();
let filename = format!("{}/part-{i}.json", parsed.prefix());
let file = object_store::path::Path::parse(filename)?;
let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
let mut inner_join_set = JoinSet::new();
while let Some(batch) = stream.try_next().await?{
inner_join_set.spawn(async move {
let buffer = Vec::with_capacity(1024);
let mut writer = json::LineDelimitedWriter::new(buffer);
writer.write(&batch)?;
let r: Result<Vec<u8>, DataFusionError> = Ok(writer.into_inner());
r
});
}
while let Some(result) = inner_join_set.join_next().await{
match result {
Ok(r) => {
let batch = r?;
multipart_writer.write_all(&batch).await?;
},
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
});
}
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => res?, // propagate DataFusion error
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
Ok(())
}
```
Now the timings are:
read parquet to memory took -> 10.990137608s
write as json to disk took -> 18.332761845s
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org