You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Jeroen (Jira)" <ji...@apache.org> on 2021/02/25 13:42:00 UTC

[jira] [Created] (ARROW-11781) Reading small amount of files from a partitioned dataset is unexpectedly slow

Jeroen created ARROW-11781:
------------------------------

             Summary: Reading small amount of files from a partitioned dataset is unexpectedly slow
                 Key: ARROW-11781
                 URL: https://issues.apache.org/jira/browse/ARROW-11781
             Project: Apache Arrow
          Issue Type: Bug
            Reporter: Jeroen


I posted this on StackOverflow and was told I should probably create an issue here.

I managed to create a relative minimal example:
{code:java}
    df = spark.createDataFrame(
        [
            (str(a), b, c, random.randint(0, 1000))
            for a in range(100)
            for b in range(10)
            for c in range(10000)
        ],
        ['a', 'b', 'c', 'd']
    )    

print("Writing the spark dataframe to the file system in partitioned folders.")
    df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir), compression='snappy', mode='overwrite')    

def time_it(func, repetition=10):
        start = time.time()
        for _ in range(repetition):
            func()
        return (time.time() - start) / repetition    

print("Loading the entire dataset")
print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow')))    

print("Loading a single file using filters")
print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]])))    

print("Loading a single file using filters and a specified partitioning")
    partitioning = pa.dataset.HivePartitioning(
        pa.schema([
            pa.field('a', pa.string()),
            pa.field('b', pa.string())
        ])
    )
print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning)))    

print("Loading a single file by specifying the path")
print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0', engine='pyarrow')))
{code}
Which gives me the following output:
{code:java}
Writing the spark dataframe to the file system in partitioned folders.
21/02/25 14:37:58 WARN TaskSetManager: Stage 0 contains a task of very large size (18240 KiB). The maximum recommended task size is 1000 KiB.
Loading the entire dataset
0.23926825523376466
Loading a single file using filters
0.04788286685943603
Loading a single file using filters and a specified partitioning
0.0323061466217041
Loading a single file by specifying the path
0.0017130613327026368
{code}
 

Loading the small amount of files is about 20 times faster if you address the paths directly, compared to the pyarrow filters.

 

The question as I posted it on StackOverflow:



I am having some problems with the speed of loading `.parquet` files. However, I don't know what I am doing wrong.

*Problem*

I am trying to read a single `.parquet` file from from my local filesystem which is the partitioned output from a spark job. Such that there are `.parquet` files in hierarchical directories named `a=x` and `b=y`.

To achieve this, I am using `pandas.read_parquet` (which uses `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The run time of using the `filters` is way longer than I would expect.
{code:java}
# The following runs for about 55 seconds
pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]])
# The following runs for about 0.04 seconds
pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)
# The following runs for about 70 seconds
pd.read_parquet(<path_to_entire_dataset>){code}
Reading a single parquet file by specifying filters is only slightly faster than loading the entire dataset, where I would expect a run time approximately linear in the amount of files.


*What mistake do I make here?*

I realize that simply putting the filters in the path would work, however this will quickly become complex as what I want to filter on will / can change. Besides, I think `read_table` should be able to load this data efficiently.

PS: The entire dataset contains many millions of rows, the data I want to load is only a few thousand rows.


*Edit 1:*
As suggested by 0x26res I manually defined the partitioning, this lead to a significant speed up, but still not as much as I would have expected. In this situation the run time was about 5 seconds.
{code:java}
partitioning = HivePartitioning(
 pa.schema([
 pa.field('a', pa.string()),
 pa.field('b', pa.int32()),
 ])
)
pd.read_parquet(
 <path_to_entire_dataset>,
 engine='pyarrow',
 filters=[
 [
 ('a', '=', x),
 ('b', '=', y),
 ]
 ],
 partitioning=partitioning
)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)