You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lazear (via GitHub)" <gi...@apache.org> on 2024/03/09 19:43:23 UTC

[I] Performance traps with arrow/parquet? [arrow-rs]

lazear opened a new issue, #5490:
URL: https://github.com/apache/arrow-rs/issues/5490

   **Which part is this question about**
   <!--
   Is it code base, library api, documentation or some other part?
   -->
   
   Library API / documentation
   
   **Describe your question**
   <!--
   A clear and concise description of what the question is.
   -->
   
   I am working on some code that is streaming rows of data from S3, using either CSV or parquet files as the backing format and converting to JSON.  Generally, I am fetching groups of 50-250 rows at a time in a simple OFFSET/LIMIT pattern, and filtering on a couple of columns.
   
   The parquet/arrow-rs crate is substantially slower than using async-csv/serde-json. Same dataset, just converted to CSV or Parquet. I am compiling with release mode on.
   
   | Format | No filter | With filter |
   | ---------- | ------------ | -------------- |
   | CSV | 70 ms | 80 ms |
   | Parquet | 800 ms | 13500 ms |
   
   Am I doing something terribly wrong? Is there a better way to accomplish this task?
   I have pasted the code I'm using below.
   
   **Additional context**
   
   ```rs
   /// example filter
   impl ResultFilter {
       fn dummy_filter(mask: ProjectionMask, s: String) -> Box<dyn ArrowPredicate> {
           Box::new(ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
               let array = batch
                   .column_by_name("dummy")
                   .ok_or_else(|| ArrowError::SchemaError("invalid schema".into()))?;
   
               let filter = arrow_array::StringArray::new_scalar(s.to_string());
               arrow::compute::kernels::comparison::contains(array, &filter)
           }))
       }
   }
   
   /// Take a user-specified set of columns to parse from the file, and some filters (enum) that can be converted into ArrowPredicateFn
   pub async fn read_parquet_columns_arrow<'a, F>(
       mut file: F,
       columns: &'a [&str],
       filters: Vec<ResultFilter>,
       offset: usize,
       limit: usize,
   ) -> anyhow::Result<impl Stream<Item = Vec<u8>> + 'a>
   where
       F: AsyncFileReader + Unpin + Send + 'static,
   {
       let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await?;
   
       let filter_columns = filters
           .iter()
           .map(ResultFilter::required_column)
           .collect::<Vec<_>>();
   
       let system_columns = meta
           .parquet_schema()
           .root_schema()
           .get_fields()
           .iter()
           .enumerate()
           .filter_map(|(idx, col)| {
               if filter_columns.contains(&col.name()) {
                   Some(idx)
               } else {
                   None
               }
           })
           .collect::<Vec<usize>>();
   
       let user_columns = meta
           .parquet_schema()
           .root_schema()
           .get_fields()
           .iter()
           .enumerate()
           .filter_map(|(idx, col)| {
               if columns.contains(&col.name()) {
                   Some(idx)
               } else {
                   None
               }
           })
           .collect::<Vec<_>>();
   
       let mask = ProjectionMask::roots(
           meta.parquet_schema(),
           user_columns
               .as_slice()
               .into_iter()
               .chain(&system_columns)
               .copied(),
       );
       let row_filter = RowFilter::new(filters.into_iter().map(|f| f.build(mask.clone())).collect());
   
       let arrow_reader = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta.clone())
           .with_row_filter(row_filter)
           .with_offset(offset)
           .with_limit(limit)
           .with_projection(mask)
           .build()?;
   
       // let roots = &roots;
       Ok(arrow_reader
           .filter_map(|f| futures::future::ready(f.ok()))
           .filter_map(move |batch| {
               let inner = || {
                   let mask = columns
                       .iter()
                       .filter_map(|name| Some(batch.schema().column_with_name(name)?.0))
                       .collect::<Vec<_>>();
   
                   let batch = batch.project(&mask).ok()?;
   
                   let buf = Vec::new();
                   let mut wtr = arrow_json::ArrayWriter::new(buf);
                   wtr.write(&batch).ok()?;
                   wtr.finish().ok()?;
   
                   Some(wtr.into_inner())
               };
   
               futures::future::ready(inner())
           }))
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Performance traps with arrow/parquet? [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on issue #5490:
URL: https://github.com/apache/arrow-rs/issues/5490#issuecomment-1986991258

   Typically you should aim for row groups in high hundreds of thousands of rows, unless you have very large strings or a very wide schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Performance traps with arrow/parquet? [arrow-rs]

Posted by "lazear (via GitHub)" <gi...@apache.org>.
lazear commented on issue #5490:
URL: https://github.com/apache/arrow-rs/issues/5490#issuecomment-1986992161

   Gotcha - that's how I have it set up now.
   
   For streaming from S3, I set up a small test case:
   
   ```rs
       let now = std::time::Instant::now();
       while let Some(page) = stream.next().await {
           eprintln!("time to response: {}", (std::time::Instant::now() - now).as_millis());
       }
       eprintln!("time to stream close: {}", (std::time::Instant::now() - now).as_millis());
   ````
   
   With no filters:
   ```
   time to response: 462
   time to stream close: 463
   ```
   
   With filters applied:
   ```
   time to response: 495
   time to stream close: 11727
   ```
   
   So the stream is returning items at the same rate, but hangs after awaiting the stream if there are filters applied.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Performance traps with arrow/parquet? [arrow-rs]

Posted by "lazear (via GitHub)" <gi...@apache.org>.
lazear commented on issue #5490:
URL: https://github.com/apache/arrow-rs/issues/5490#issuecomment-2006047407

   OK, so I did some profiling... the biggest culprit appears to be farther down in my stack (gzip compression of the streaming newline-delimited JSON...), which delays returning any responses until the stream is finished - which in the case of filters being applied, requires the entire file to be parsed first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Performance traps with arrow/parquet? [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on issue #5490:
URL: https://github.com/apache/arrow-rs/issues/5490#issuecomment-1986993995

   Perhaps you could use a CPU profiler such as hotspot to identify where it is spending time, my suspicion is because the predicate is evaluated before the offset and limit, it is having to do a lot of extra work to find the n'th row that passes the predicate


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Performance traps with arrow/parquet? [arrow-rs]

Posted by "lazear (via GitHub)" <gi...@apache.org>.
lazear closed issue #5490: Performance traps with arrow/parquet?
URL: https://github.com/apache/arrow-rs/issues/5490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Performance traps with arrow/parquet? [arrow-rs]

Posted by "lazear (via GitHub)" <gi...@apache.org>.
lazear commented on issue #5490:
URL: https://github.com/apache/arrow-rs/issues/5490#issuecomment-2006132900

   For my own reference as well, this is related to https://github.com/tower-rs/tower-http/issues/292
   
   A subset of the filters I am applying cause few rows to be returned, so the async-compression layer isn't getting enough data flushed through it to actually allow it to stream 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Performance traps with arrow/parquet? [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on issue #5490:
URL: https://github.com/apache/arrow-rs/issues/5490#issuecomment-1986966644

   Parquet is a block-oriented data format, where the lowest addressable unit may contain hundreds or even millions of rows. Reading a small selection of rows is therefore very expensive. Therefore when reading just 100 rows, it may have to decode far more than this in order to find the rows of interest.
   
   Some thoughts:
   
   * You should enable reading the page index, which will help push down the RowSelection to the page level - https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index
   * I would recommend using the object_store integration which is able to better perform vectorised reads
   * I recommend against pushing down filters that don't result in consecutive runs of matching rows, instead applying filters after the fact
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Performance traps with arrow/parquet? [arrow-rs]

Posted by "lazear (via GitHub)" <gi...@apache.org>.
lazear commented on issue #5490:
URL: https://github.com/apache/arrow-rs/issues/5490#issuecomment-1986985334

   Thanks for the quick response.
   
   After doing some more benchmarking locally, I think it might be actually restricted to just streaming over the network. Reading from a local file and just dumping output to stdout, I can get approximately same time for both CSV/Parquet with both the rust code and python-polars (20-60 ms).
   
   Do you have any recommendations for row group size for this kind of use case? I am manually writing the parquet files using the low-level ColumnWriter API.
   
   - In terms of streaming, this is for an API integration/frontend datatable with pagination. I can try to see if it's possible to just only using streaming instead though, it's a good idea.
   - I am already using the object-store integration, but I haven't tried enabling the page index. 
   - The filters I'm applying should result in consecutive runs of matching rows most of the time, but I can also apply them in application code after receiving the rows.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org