You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "chrisstockton (via GitHub)" <gi...@apache.org> on 2023/02/11 00:28:47 UTC

[GitHub] [beam] chrisstockton opened a new issue, #25435: [Feature Request]: Add optional reshuffle stop to ReadAllFromParquet

chrisstockton opened a new issue, #25435:
URL: https://github.com/apache/beam/issues/25435

   ### What would you like to happen?
   
   We run a python beam batch pipeline in cloud dataflow that reads a large number of files from a list of gcs file patterns.  
   
   Something like:
   ```
   input_files = ['gs://bucket/my_files_1*.parquet', 'gs://bucket/my_files_2*.parquet']
   beam.Create(input_files) | beam.io.ReadAllFromParquet(...)
   ```
   Each entry path is expanded to several 1000 files resulting in a "high fan-out detected" warning in the cloud dataflow UI.  
   
   The warning suggests adding an additional step between the `ReadAllFiles` and `_ArrowTableToRowDictionaries` steps as a way to improve performance:
   
   ```
   ReadAllFiles/ReadAllFiles/ExpandIntoRanges, ReadEvents/ReadAllFiles/ReadAllFiles/ExpandIntoRanges, ReadEvents/ParDo(_ArrowTableToRowDictionaries)
   ```
   
   However, those steps are in the `ReadAllFromParquet` [library](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L267-L268):  
   ```
     def expand(self, pvalue):
       return pvalue | Read(self._source) | ParDo(_ArrowTableToRowDictionaries())
   ```
   
   Because `Read(self._source)` goes from a few input file patterns to 10s of 1000s of actual files, adding a `beam.Reshuffle()` step between `Read(self._source)` and `ParDo(_ArrowTableToRowDictionaries()` would improve performance, according to the [cloud dataflow docs](https://cloud.google.com/dataflow/docs/guides/using-dataflow-insights#high-fan-out).
   
   Having a configurable re-shuffle step:
   
   ```
   beam.io.ReadAllFromParquet(..., reshuffle_after_read_files=True)
   
     def expand(self, pvalue):
       files =  pvalue | Read(self._source) 
         if self._reshuffle_after_read_files:
            files = files | beam.Resuffle()
   
       return files | ParDo(_ArrowTableToRowDictionaries())
   ```
   should help increase the performance when reading from an input pattern that is expanded into a large number of actual files.
   
   ### Issue Priority
   
   Priority: 3 (nice-to-have improvement)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org