You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "David Li (Jira)" <ji...@apache.org> on 2021/07/05 12:41:00 UTC

[jira] [Comment Edited] (ARROW-11781) [Python] Reading small amount of files from a partitioned dataset is unexpectedly slow

    [ https://issues.apache.org/jira/browse/ARROW-11781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374798#comment-17374798 ] 

David Li edited comment on ARROW-11781 at 7/5/21, 12:40 PM:
------------------------------------------------------------

I think we can close it for now, as it's tracked in benchmarks and had some improvements in 4.0.0. There's also some more potential improvements linked. If there's still issues we can reopen or open a new issue.


was (Author: lidavidm):
I think we can close it for now, as it's tracked in benchmarks and had some improvements in 4.0.0. If there's still issues we can reopen or open a new issue.

> [Python] Reading small amount of files from a partitioned dataset is unexpectedly slow
> --------------------------------------------------------------------------------------
>
>                 Key: ARROW-11781
>                 URL: https://issues.apache.org/jira/browse/ARROW-11781
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Jeroen
>            Priority: Minor
>             Fix For: 5.0.0
>
>         Attachments: spy.svg, spy2.svg, spy3.svg
>
>
> I posted this on StackOverflow and was told I should probably create an issue here.
> I managed to create a relative minimal example:
> {code:java}
>     df = spark.createDataFrame(
>         [
>             (str(a), b, c, random.randint(0, 1000))
>             for a in range(100)
>             for b in range(10)
>             for c in range(10000)
>         ],
>         ['a', 'b', 'c', 'd']
>     )    
> print("Writing the spark dataframe to the file system in partitioned folders.")
>     df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir), compression='snappy', mode='overwrite')    
> def time_it(func, repetition=10):
>         start = time.time()
>         for _ in range(repetition):
>             func()
>         return (time.time() - start) / repetition    
> print("Loading the entire dataset")
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow')))    
> print("Loading a single file using filters")
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]])))    
> print("Loading a single file using filters and a specified partitioning")
>     partitioning = pa.dataset.HivePartitioning(
>         pa.schema([
>             pa.field('a', pa.string()),
>             pa.field('b', pa.string())
>         ])
>     )
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning)))    
> print("Loading a single file by specifying the path")
> print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0', engine='pyarrow')))
> {code}
> Which gives me the following output:
> {code:java}
> Writing the spark dataframe to the file system in partitioned folders.
> Loading the entire dataset
> 0.23926825523376466
> Loading a single file using filters
> 0.04788286685943603
> Loading a single file using filters and a specified partitioning
> 0.0323061466217041
> Loading a single file by specifying the path
> 0.0017130613327026368
> {code}
>  
> Loading the small amount of files is about 20 times faster if you address the paths directly, compared to the pyarrow filters.
>  
> The question as I posted it on StackOverflow:
> I am having some problems with the speed of loading `.parquet` files. However, I don't know what I am doing wrong.
> *Problem*
> I am trying to read a single `.parquet` file from from my local filesystem which is the partitioned output from a spark job. Such that there are `.parquet` files in hierarchical directories named `a=x` and `b=y`.
> To achieve this, I am using `pandas.read_parquet` (which uses `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The run time of using the `filters` is way longer than I would expect.
> {code:java}
> # The following runs for about 55 seconds
> pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]])
> # The following runs for about 0.04 seconds
> pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)
> # The following runs for about 70 seconds
> pd.read_parquet(<path_to_entire_dataset>){code}
> Reading a single parquet file by specifying filters is only slightly faster than loading the entire dataset, where I would expect a run time approximately linear in the amount of files.
> *What mistake do I make here?*
> I realize that simply putting the filters in the path would work, however this will quickly become complex as what I want to filter on will / can change. Besides, I think `read_table` should be able to load this data efficiently.
> PS: The entire dataset contains many millions of rows, the data I want to load is only a few thousand rows.
> *Edit 1:*
>  As suggested by 0x26res I manually defined the partitioning, this lead to a significant speed up, but still not as much as I would have expected. In this situation the run time was about 5 seconds.
> {code:java}
> partitioning = HivePartitioning(
>  pa.schema([
>  pa.field('a', pa.string()),
>  pa.field('b', pa.int32()),
>  ])
> )
> pd.read_parquet(
>  <path_to_entire_dataset>,
>  engine='pyarrow',
>  filters=[
>  [
>  ('a', '=', x),
>  ('b', '=', y),
>  ]
>  ],
>  partitioning=partitioning
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)