You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Hor911 (via GitHub)" <gi...@apache.org> on 2023/03/05 21:52:07 UTC

[GitHub] [arrow] Hor911 opened a new issue, #34460: [C++] Split arrow::FileReader::ReadRowGroups() to 2 methods for flexible async IO

Hor911 opened a new issue, #34460:
URL: https://github.com/apache/arrow/issues/34460

   ### Describe the enhancement requested
   
   Current implementation of arrow::FileReader::ReadRowGroups() does 2 things:
   
   1. Triggers pre-buffering if enabled
   2. Blocks until all data is received and decodes it
   
   Sync interface complicates use in environments where additional threads are undesirable. I suggest to break this method:
   
   ```
   Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
                                        const std::vector<int>& column_indices,
                                        std::shared_ptr<Table>* out);
   ```
   
   To these two:
   
   ```
   Status FileReader::WillNeedRowGroups(const std::vector<int>& row_groups,
                                       const std::vector<int>& column_indices);
   Status FileReader::DecodeRowGroups(const std::vector<int>& row_groups,
                                       const std::vector<int>& column_indices,
                                       std::shared_ptr<::arrow::Table>* out);
   ```
   
   WillNeedRowGroups() will trigger pre-buffering while DecodeRowGroups() will blocks and read the data.
   Functionality of ReadRowGroups() will remain the same and equals for calling of 2 new methods sequentially. 
   
   New methods allow read data in completely async way with custom arrow::io::RandomAccessFile implementation as follows
   
   1. Lazy pre-buffering must be enabled:
   ```
   parquet::ArrowReaderProperties properties;
   properties.set_cache_options(arrow::io::CacheOptions::LazyDefaults());
   properties.set_pre_buffer(true);
   ```
   2. Application code calls arrow::FileReader::WillNeedRowGroups(...) to start pre-buffering 
   3. Custom arrow::io::RandomAccessFile receives WillNeed(...) call, starts async IO and returns instantly
   4. Application continue to run until all data are loaded into RandomAccessFile instance
   5. Application calls arrow::FileReader::DecodeRowGroups(...) to get actual data. It will use Futures for waiting, but since data is ready, all arrow::io::RandomAccessFile::ReadAt/AsyncRead calls will be replied instantly and all Futures will be completed without waiting.
   
   Another great technique to increase performance in high latency systems (yes, it's S3) is read ahead:
   
   1. Create N instances of arrow::FileReader
   2. Start pre-buffering of N row groups, one per each reader with WillNeedRowGroups() call
   3. As soon as data are loaded (into RandomAccessFile) get it with DecodeRowGroups() 
   4. and call WillNeedRowGroups() again to receive into this reader next row group data
   
   It makes possible fast reading of large parquet files in stream-like mode with performance compared to single ReadRowGroups call of current API with complete list of row groups, but without complete caching it in the memory.
   
   In both scenarios application code will never locks on future provided that correct custom RandomAccessFile implementation will cache data from (any) async IO library and activate data load in any way after data is ready.
   
   
   ### Component(s)
   
   C++


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] Hor911 commented on issue #34460: [C++] Split arrow::FileReader::ReadRowGroups() to 2 methods for flexible async IO

Posted by "Hor911 (via GitHub)" <gi...@apache.org>.
Hor911 commented on issue #34460:
URL: https://github.com/apache/arrow/issues/34460#issuecomment-1455218992

   Implementation of 2 new methods and little ReadRowGroups for dedup. Tested with large and complex parquet files and works very well.
   
   ```
   Status FileReaderImpl::WillNeedRowGroups(const std::vector<int>& row_groups,
                                            const std::vector<int>& column_indices) {
     RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
   
     // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
     if (reader_properties_.pre_buffer()) {
       BEGIN_PARQUET_CATCH_EXCEPTIONS
       parquet_reader()->PreBuffer(row_groups, column_indices,
                                   reader_properties_.io_context(),
                                   reader_properties_.cache_options());
       END_PARQUET_CATCH_EXCEPTIONS
     }
     return Status::OK();
   }
   
   Status FileReaderImpl::DecodeRowGroups(const std::vector<int>& row_groups,
                          const std::vector<int>& column_indices,
                          std::shared_ptr<::arrow::Table>* out) {
     RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
   
     auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
                                /*cpu_executor=*/nullptr);
     ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
     return Status::OK();
   }
   
   Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
                                        const std::vector<int>& column_indices,
                                        std::shared_ptr<Table>* out) {
     RETURN_NOT_OK(WillNeedRowGroups(row_groups, column_indices));
   
     auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
                                /*cpu_executor=*/nullptr);
     ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
     return Status::OK();
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 commented on issue #34460: [C++] Split arrow::FileReader::ReadRowGroups() to 2 methods for flexible async IO

Posted by "wjones127 (via GitHub)" <gi...@apache.org>.
wjones127 commented on issue #34460:
URL: https://github.com/apache/arrow/issues/34460#issuecomment-1456574854

   IIRC we've discussed in the past that we'd like to separate IO bound and CPU bound tasks so they can be run on separate thread pools, as described in https://jorgecarleitao.medium.com/how-to-efficiently-load-data-to-memory-d65ee359196c
   
   It seems this is related to that work, but I'm not familiar enough with the current async functionality to be sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #34460: [C++] Split arrow::FileReader::ReadRowGroups() to 2 methods for flexible async IO

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on issue #34460:
URL: https://github.com/apache/arrow/issues/34460#issuecomment-1457015836

   There is a way to sort of do this today with `FileReader::GetRecordBatchGenerator`.  However, it was one of our first attempts at async support and I think it duplicates a lot of code.
   
   What you are describing here (`WillNeed` and `Decode`) sounds more similar to what we ended up adopting in the IPC reader (e.g. `PreBufferMetadata`).  Note: what have you have right now is not tested and so it's a bit tricky to understand how exactly you plan to use it.  It's not clear to me how you would use this without blocking a CPU thread or doing CPU on an I/O thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org