You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Weston Pace (Jira)" <ji...@apache.org> on 2021/02/25 17:48:00 UTC

[jira] [Updated] (ARROW-11781) [Python] Reading small amount of files from a partitioned dataset is unexpectedly slow

     [ https://issues.apache.org/jira/browse/ARROW-11781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Weston Pace updated ARROW-11781:
--------------------------------
    Summary: [Python] Reading small amount of files from a partitioned dataset is unexpectedly slow  (was: Reading small amount of files from a partitioned dataset is unexpectedly slow)

> [Python] Reading small amount of files from a partitioned dataset is unexpectedly slow
> --------------------------------------------------------------------------------------
>
>                 Key: ARROW-11781
>                 URL: https://issues.apache.org/jira/browse/ARROW-11781
>             Project: Apache Arrow
>          Issue Type: Bug
>            Reporter: Jeroen
>            Priority: Minor
>
> I posted this on StackOverflow and was told I should probably create an issue here.
> I managed to create a relative minimal example:
> {code:java}
>     df = spark.createDataFrame(
>         [
>             (str(a), b, c, random.randint(0, 1000))
>             for a in range(100)
>             for b in range(10)
>             for c in range(10000)
>         ],
>         ['a', 'b', 'c', 'd']
>     )    
> print("Writing the spark dataframe to the file system in partitioned folders.")
>     df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir), compression='snappy', mode='overwrite')    
> def time_it(func, repetition=10):
>         start = time.time()
>         for _ in range(repetition):
>             func()
>         return (time.time() - start) / repetition    
> print("Loading the entire dataset")
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow')))    
> print("Loading a single file using filters")
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]])))    
> print("Loading a single file using filters and a specified partitioning")
>     partitioning = pa.dataset.HivePartitioning(
>         pa.schema([
>             pa.field('a', pa.string()),
>             pa.field('b', pa.string())
>         ])
>     )
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning)))    
> print("Loading a single file by specifying the path")
> print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0', engine='pyarrow')))
> {code}
> Which gives me the following output:
> {code:java}
> Writing the spark dataframe to the file system in partitioned folders.
> Loading the entire dataset
> 0.23926825523376466
> Loading a single file using filters
> 0.04788286685943603
> Loading a single file using filters and a specified partitioning
> 0.0323061466217041
> Loading a single file by specifying the path
> 0.0017130613327026368
> {code}
>  
> Loading the small amount of files is about 20 times faster if you address the paths directly, compared to the pyarrow filters.
>  
> The question as I posted it on StackOverflow:
> I am having some problems with the speed of loading `.parquet` files. However, I don't know what I am doing wrong.
> *Problem*
> I am trying to read a single `.parquet` file from from my local filesystem which is the partitioned output from a spark job. Such that there are `.parquet` files in hierarchical directories named `a=x` and `b=y`.
> To achieve this, I am using `pandas.read_parquet` (which uses `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The run time of using the `filters` is way longer than I would expect.
> {code:java}
> # The following runs for about 55 seconds
> pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]])
> # The following runs for about 0.04 seconds
> pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)
> # The following runs for about 70 seconds
> pd.read_parquet(<path_to_entire_dataset>){code}
> Reading a single parquet file by specifying filters is only slightly faster than loading the entire dataset, where I would expect a run time approximately linear in the amount of files.
> *What mistake do I make here?*
> I realize that simply putting the filters in the path would work, however this will quickly become complex as what I want to filter on will / can change. Besides, I think `read_table` should be able to load this data efficiently.
> PS: The entire dataset contains many millions of rows, the data I want to load is only a few thousand rows.
> *Edit 1:*
>  As suggested by 0x26res I manually defined the partitioning, this lead to a significant speed up, but still not as much as I would have expected. In this situation the run time was about 5 seconds.
> {code:java}
> partitioning = HivePartitioning(
>  pa.schema([
>  pa.field('a', pa.string()),
>  pa.field('b', pa.int32()),
>  ])
> )
> pd.read_parquet(
>  <path_to_entire_dataset>,
>  engine='pyarrow',
>  filters=[
>  [
>  ('a', '=', x),
>  ('b', '=', y),
>  ]
>  ],
>  partitioning=partitioning
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)